diff --git a/README.md b/README.md index cb6d88c3..0ac58358 100644 --- a/README.md +++ b/README.md @@ -144,7 +144,7 @@ EXAMPLES $ sf data bulk results --job-id 7507i000fake341G --target-org my-scratch ``` -_See code: [src/commands/data/bulk/results.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/bulk/results.ts)_ +_See code: [src/commands/data/bulk/results.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/bulk/results.ts)_ ## `sf data create file` @@ -193,7 +193,7 @@ EXAMPLES $ sf data create file --file path/to/astro.png --parent-id a03fakeLoJWPIA3 ``` -_See code: [src/commands/data/create/file.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/create/file.ts)_ +_See code: [src/commands/data/create/file.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/create/file.ts)_ ## `sf data create record` @@ -249,7 +249,7 @@ EXAMPLES TracedEntityId=01p17000000R6bLAAS" ``` -_See code: [src/commands/data/create/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/create/record.ts)_ +_See code: [src/commands/data/create/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/create/record.ts)_ ## `sf data delete bulk` @@ -308,7 +308,7 @@ FLAG DESCRIPTIONS and can be enabled only by a system administrator. ``` -_See code: [src/commands/data/delete/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/delete/bulk.ts)_ +_See code: [src/commands/data/delete/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/bulk.ts)_ ## `sf data delete record` @@ -369,7 +369,7 @@ EXAMPLES $ sf data delete record --use-tooling-api --sobject TraceFlag --record-id 7tf8c ``` -_See code: [src/commands/data/delete/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/delete/record.ts)_ +_See code: [src/commands/data/delete/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/record.ts)_ ## `sf data delete resume` @@ -408,7 +408,7 @@ EXAMPLES $ sf data delete resume --use-most-recent --target-org my-scratch ``` -_See code: [src/commands/data/delete/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/delete/resume.ts)_ +_See code: [src/commands/data/delete/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/resume.ts)_ ## `sf data export bulk` @@ -475,7 +475,7 @@ EXAMPLES --result-format json --wait 10 --all-rows ``` -_See code: [src/commands/data/export/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/export/bulk.ts)_ +_See code: [src/commands/data/export/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/bulk.ts)_ ## `sf data export resume` @@ -514,7 +514,7 @@ EXAMPLES $ sf data export resume --use-most-recent ``` -_See code: [src/commands/data/export/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/export/resume.ts)_ +_See code: [src/commands/data/export/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/resume.ts)_ ## `sf data export tree` @@ -574,7 +574,7 @@ EXAMPLES my-scratch ``` -_See code: [src/commands/data/export/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/export/tree.ts)_ +_See code: [src/commands/data/export/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/tree.ts)_ ## `sf data get record` @@ -638,7 +638,7 @@ EXAMPLES $ sf data get record --use-tooling-api --sobject TraceFlag --record-id 7tf8c ``` -_See code: [src/commands/data/get/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/get/record.ts)_ +_See code: [src/commands/data/get/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/get/record.ts)_ ## `sf data import bulk` @@ -690,7 +690,7 @@ EXAMPLES $ sf data import bulk --file accounts.csv --sobject Account --wait 10 --target-org my-scratch ``` -_See code: [src/commands/data/import/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/import/bulk.ts)_ +_See code: [src/commands/data/import/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/bulk.ts)_ ## `sf data import resume` @@ -726,7 +726,7 @@ EXAMPLES $ sf data import resume --use-most-recent --target-org my-scratch ``` -_See code: [src/commands/data/import/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/import/resume.ts)_ +_See code: [src/commands/data/import/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/resume.ts)_ ## `sf data import tree` @@ -790,7 +790,7 @@ FLAG DESCRIPTIONS - files(array) - Files: An array of files paths to load ``` -_See code: [src/commands/data/import/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/import/tree.ts)_ +_See code: [src/commands/data/import/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/tree.ts)_ ## `sf data query` @@ -843,7 +843,7 @@ EXAMPLES $ sf data query --query "SELECT Name FROM ApexTrigger" --use-tooling-api ``` -_See code: [src/commands/data/query.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/query.ts)_ +_See code: [src/commands/data/query.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/query.ts)_ ## `sf data resume` @@ -880,7 +880,7 @@ EXAMPLES $ sf data resume --job-id 750xx000000005sAAA --batch-id 751xx000000005nAAA ``` -_See code: [src/commands/data/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/resume.ts)_ +_See code: [src/commands/data/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/resume.ts)_ ## `sf data search` @@ -930,7 +930,7 @@ EXAMPLES $ sf data search --file query.txt --target-org my-scratch --result-format csv ``` -_See code: [src/commands/data/search.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/search.ts)_ +_See code: [src/commands/data/search.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/search.ts)_ ## `sf data update bulk` @@ -985,7 +985,7 @@ EXAMPLES $ sf data update bulk --file accounts.csv --sobject Account --wait 10 --target-org my-scratch ``` -_See code: [src/commands/data/update/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/update/bulk.ts)_ +_See code: [src/commands/data/update/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/bulk.ts)_ ## `sf data update record` @@ -1047,7 +1047,7 @@ EXAMPLES "ExpirationDate=2017-12-01T00:58:04.000+0000" ``` -_See code: [src/commands/data/update/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/update/record.ts)_ +_See code: [src/commands/data/update/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/record.ts)_ ## `sf data update resume` @@ -1086,7 +1086,7 @@ EXAMPLES $ sf data update resume --use-most-recent ``` -_See code: [src/commands/data/update/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/update/resume.ts)_ +_See code: [src/commands/data/update/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/resume.ts)_ ## `sf data upsert bulk` @@ -1142,7 +1142,7 @@ EXAMPLES my-scratch ``` -_See code: [src/commands/data/upsert/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/upsert/bulk.ts)_ +_See code: [src/commands/data/upsert/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/upsert/bulk.ts)_ ## `sf data upsert resume` @@ -1181,7 +1181,7 @@ EXAMPLES $ sf data upsert resume --use-most-recent --target-org my-scratch ``` -_See code: [src/commands/data/upsert/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/upsert/resume.ts)_ +_See code: [src/commands/data/upsert/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/upsert/resume.ts)_ ## `sf force data bulk delete` @@ -1228,7 +1228,7 @@ EXAMPLES $ sf force data bulk delete --sobject MyObject__c --file files/delete.csv --wait 5 --target-org my-scratch ``` -_See code: [src/commands/force/data/bulk/delete.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/force/data/bulk/delete.ts)_ +_See code: [src/commands/force/data/bulk/delete.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/delete.ts)_ ## `sf force data bulk status` @@ -1265,7 +1265,7 @@ EXAMPLES $ sf force data bulk status --job-id 750xx000000005sAAA --batch-id 751xx000000005nAAA --target-org my-scratch ``` -_See code: [src/commands/force/data/bulk/status.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/force/data/bulk/status.ts)_ +_See code: [src/commands/force/data/bulk/status.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/status.ts)_ ## `sf force data bulk upsert` @@ -1323,6 +1323,6 @@ EXAMPLES --target-org my-scratch ``` -_See code: [src/commands/force/data/bulk/upsert.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/force/data/bulk/upsert.ts)_ +_See code: [src/commands/force/data/bulk/upsert.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/upsert.ts)_ diff --git a/package.json b/package.json index 0b50a21a..cebd8ad6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@salesforce/plugin-data", - "version": "4.0.75", + "version": "4.0.74-dev.0", "description": "Plugin for salesforce data commands", "author": "Salesforce", "homepage": "https://github.com/salesforcecli/plugin-data", @@ -135,6 +135,7 @@ "csv-stringify": "^6.6.0", "form-data": "^4.0.5", "terminal-link": "^3.0.0", + "undici": "^7.22.0", "zod": "^4.3.6" }, "devDependencies": { diff --git a/src/bulkUtils.ts b/src/bulkUtils.ts index d9169804..8d407dcb 100644 --- a/src/bulkUtils.ts +++ b/src/bulkUtils.ts @@ -14,12 +14,12 @@ * limitations under the License. */ -import { Transform, Readable } from 'node:stream'; +import { Transform, Readable, TransformCallback } from 'node:stream'; import { createInterface } from 'node:readline'; import { pipeline } from 'node:stream/promises'; import * as fs from 'node:fs'; import { EOL } from 'node:os'; -import { HttpApi } from '@jsforce/jsforce-node/lib/http-api.js'; +import { fetch } from 'undici'; import { HttpResponse } from '@jsforce/jsforce-node'; import { IngestJobV2Results, @@ -75,28 +75,98 @@ export enum ColumnDelimiter { export type ColumnDelimiterKeys = keyof typeof ColumnDelimiter; -async function bulkRequest(conn: Connection, url: string): Promise<{ body: string; headers: HttpResponse['headers'] }> { - const httpApi = new HttpApi(conn, { - responseType: 'text/plain', // this ensures jsforce doesn't try parsing the body - }); +/** + * Transform stream that skips the first line of CSV data (the header row). + * Used when processing subsequent bulk result pages to avoid duplicate headers. + * + * Optimized to work directly with Buffers without string conversion for better memory efficiency. + */ +export class SkipFirstLineTransform extends Transform { + private firstLineSkipped = false; + private buffer: Buffer = Buffer.alloc(0); + + public constructor() { + super(); + } + + public _transform(chunk: Buffer, _encoding: BufferEncoding, callback: TransformCallback): void { + if (this.firstLineSkipped) { + // After first line is skipped, pass through all subsequent data + callback(null, chunk); + return; + } - let headers: HttpResponse['headers'] | undefined; + // Buffer incoming data until we find the first newline + // Work directly with Buffers to avoid string conversion overhead + this.buffer = Buffer.concat([this.buffer, chunk]); - httpApi.on('response', (response: HttpResponse) => { - headers = response.headers; - }); + // Find newline byte (0x0A for \n) + const newlineIndex = this.buffer.indexOf(0x0a); + + if (newlineIndex === -1) { + // No newline yet, keep buffering + callback(); + return; + } + + // Found the newline, skip everything up to and including it + const remainingData = this.buffer.subarray(newlineIndex + 1); + this.firstLineSkipped = true; + this.buffer = Buffer.alloc(0); // Clear buffer to free memory + + callback(null, remainingData); + } + + public _flush(callback: TransformCallback): void { + // If we reach the end without finding a newline, clear buffer and finish + this.buffer = Buffer.alloc(0); + callback(); + } +} - const body = await httpApi.request({ - url: conn.normalizeUrl(url), +async function bulkRequest( + conn: Connection, + url: string +): Promise<{ stream: Readable; headers: HttpResponse['headers'] }> { + // Bypass jsforce entirely and use undici fetch to avoid any buffering. + // jsforce's Transport.httpRequest() adds a 'complete' listener which triggers readAll() buffering. + // Using undici fetch directly gives us the raw response stream without any intermediate buffering. + + const normalizedUrl = conn.normalizeUrl(url); + + // Prepare request headers with authorization + const headers: { [name: string]: string } = { + 'content-Type': 'text/csv', + }; + + // ensure undici gets a valid access token + await conn.refreshAuth(); + + if (conn.accessToken) { + headers.Authorization = `Bearer ${conn.accessToken}`; + } + + const response = await fetch(normalizedUrl, { method: 'GET', + headers, }); - if (!headers) throw new Error('failed to get HTTP headers for bulk query'); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } - return { - body, - headers, - }; + if (!response.body) { + throw new Error('No body was returned'); + } + const stream = Readable.fromWeb(response.body); + + // Extract headers in the format jsforce expects + const responseHeaders: HttpResponse['headers'] = {}; + response.headers.forEach((value: string, key: string) => { + responseHeaders[key] = value; + }); + + return { stream, headers: responseHeaders }; } export async function exportRecords( @@ -151,7 +221,7 @@ export async function exportRecords( // eslint-disable-next-line no-await-in-loop await pipeline( - Readable.from(res.body), + res.stream, new csvParse({ columns: true, delimiter: ColumnDelimiter[outputInfo.columnDelimiter] }), new Transform({ objectMode: true, @@ -173,18 +243,15 @@ export async function exportRecords( await pipeline( locator ? [ - // Skip the 1st row (CSV header) by finding the index of the first `LF` - // occurence and move the position 1 char ahead. - // - // CSVs using `CRLF` are still handled correctly because `CR` and `LF` are different chars in the string. - Readable.from(res.body.slice(res.body.indexOf('\n') + 1)), + res.stream, + new SkipFirstLineTransform(), fs.createWriteStream(outputInfo.filePath, { // Open file for appending. The file is created if it does not exist. // https://nodejs.org/api/fs.html#file-system-flags flags: 'a', // append mode }), ] - : [Readable.from(res.body), fs.createWriteStream(outputInfo.filePath)] + : [res.stream, fs.createWriteStream(outputInfo.filePath)] ); } diff --git a/test/bulkUtils.test.ts b/test/bulkUtils.test.ts index 820bde3f..d53812a8 100644 --- a/test/bulkUtils.test.ts +++ b/test/bulkUtils.test.ts @@ -14,9 +14,12 @@ * limitations under the License. */ +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; + import { expect } from 'chai'; -import { detectDelimiter } from '../src/bulkUtils.js'; +import { detectDelimiter, SkipFirstLineTransform } from '../src/bulkUtils.js'; describe('bulkUtils', () => { describe('csv', () => { @@ -31,4 +34,92 @@ describe('bulkUtils', () => { expect(await detectDelimiter('./test/test-files/csv/tab.csv')).to.equal('TAB'); }); }); + + describe('SkipFirstLineTransform', () => { + async function streamToString(readable: Readable): Promise { + const chunks: Buffer[] = []; + for await (const chunk of readable) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string)); + } + return Buffer.concat(chunks).toString('utf8'); + } + + it('skips first line with LF endings', async () => { + const input = 'Header1,Header2,Header3\nRow1Col1,Row1Col2,Row1Col3\nRow2Col1,Row2Col2,Row2Col3\n'; + const expected = 'Row1Col1,Row1Col2,Row1Col3\nRow2Col1,Row2Col2,Row2Col3\n'; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('skips first line with CRLF endings', async () => { + const input = 'Header1,Header2,Header3\r\nRow1Col1,Row1Col2,Row1Col3\r\nRow2Col1,Row2Col2,Row2Col3\r\n'; + const expected = 'Row1Col1,Row1Col2,Row1Col3\r\nRow2Col1,Row2Col2,Row2Col3\r\n'; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles empty stream after header', async () => { + const input = 'Header1,Header2,Header3\n'; + const expected = ''; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles single-line input without newline', async () => { + // Edge case: header with no newline at all + const input = 'Header1,Header2,Header3'; + const expected = ''; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles multi-byte UTF-8 characters in header', async () => { + const input = 'Header1,Hëàdér2,Header3\nRow1Col1,Row1Col2,Row1Col3\n'; + const expected = 'Row1Col1,Row1Col2,Row1Col3\n'; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles very long header line', async () => { + // Create a header with many columns + const headerCols = Array.from({ length: 100 }, (_, i) => `Header${i}`).join(','); + const dataCols = Array.from({ length: 100 }, (_, i) => `Data${i}`).join(','); + const input = `${headerCols}\n${dataCols}\n`; + const expected = `${dataCols}\n`; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('passes through data correctly in pipeline', async () => { + const input = 'Id,Name,Email\n1,John,john@example.com\n2,Jane,jane@example.com\n'; + const expected = '1,John,john@example.com\n2,Jane,jane@example.com\n'; + + const chunks: string[] = []; + await pipeline( + Readable.from(input), + new SkipFirstLineTransform(), + async function* (source: AsyncIterable) { + for await (const chunk of source) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + chunks.push(buffer.toString('utf8')); + yield chunk; + } + } + ); + + expect(chunks.join('')).to.equal(expected); + }); + }); }); diff --git a/yarn.lock b/yarn.lock index 66a25858..e901768d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8069,6 +8069,11 @@ undici-types@~7.18.0: resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-7.18.2.tgz#29357a89e7b7ca4aef3bf0fd3fd0cd73884229e9" integrity sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w== +undici@^7.22.0: + version "7.22.0" + resolved "https://registry.yarnpkg.com/undici/-/undici-7.22.0.tgz#7a82590a5908e504a47d85c60b0f89ca14240e60" + integrity sha512-RqslV2Us5BrllB+JeiZnK4peryVTndy9Dnqq62S3yYRRTj0tFQCwEniUy2167skdGOy3vqRzEvl1Dm4sV2ReDg== + unicorn-magic@^0.3.0: version "0.3.0" resolved "https://registry.yarnpkg.com/unicorn-magic/-/unicorn-magic-0.3.0.tgz#4efd45c85a69e0dd576d25532fbfa22aa5c8a104"