From 5fd3b4cfd935b0a7ce88d9f18ecbacb40653c10e Mon Sep 17 00:00:00 2001 From: Cristian Dominguez Date: Tue, 24 Feb 2026 11:53:36 -0300 Subject: [PATCH 1/5] test: bypass jsforce request buffering --- package.json | 1 + src/bulkUtils.ts | 111 ++++++++++++++++++++++++++++++++--------- test/bulkUtils.test.ts | 105 +++++++++++++++++++++++++++++++++++++- yarn.lock | 5 ++ 4 files changed, 197 insertions(+), 25 deletions(-) diff --git a/package.json b/package.json index 3ee2640b..2c406d6b 100644 --- a/package.json +++ b/package.json @@ -135,6 +135,7 @@ "csv-stringify": "^6.6.0", "form-data": "^4.0.5", "terminal-link": "^3.0.0", + "undici": "^7.22.0", "zod": "^4.3.6" }, "devDependencies": { diff --git a/src/bulkUtils.ts b/src/bulkUtils.ts index d9169804..b7b196f5 100644 --- a/src/bulkUtils.ts +++ b/src/bulkUtils.ts @@ -14,12 +14,12 @@ * limitations under the License. */ -import { Transform, Readable } from 'node:stream'; +import { Transform, Readable, TransformCallback } from 'node:stream'; import { createInterface } from 'node:readline'; import { pipeline } from 'node:stream/promises'; import * as fs from 'node:fs'; import { EOL } from 'node:os'; -import { HttpApi } from '@jsforce/jsforce-node/lib/http-api.js'; +import { fetch } from 'undici'; import { HttpResponse } from '@jsforce/jsforce-node'; import { IngestJobV2Results, @@ -75,28 +75,91 @@ export enum ColumnDelimiter { export type ColumnDelimiterKeys = keyof typeof ColumnDelimiter; -async function bulkRequest(conn: Connection, url: string): Promise<{ body: string; headers: HttpResponse['headers'] }> { - const httpApi = new HttpApi(conn, { - responseType: 'text/plain', // this ensures jsforce doesn't try parsing the body - }); +/** + * Transform stream that skips the first line of CSV data (the header row). + * Used when processing subsequent bulk result pages to avoid duplicate headers. + */ +export class SkipFirstLineTransform extends Transform { + private firstLineSkipped = false; + private buffer = ''; - let headers: HttpResponse['headers'] | undefined; + public constructor() { + super(); + } - httpApi.on('response', (response: HttpResponse) => { - headers = response.headers; - }); + public _transform(chunk: Buffer, _encoding: BufferEncoding, callback: TransformCallback): void { + if (this.firstLineSkipped) { + // After first line is skipped, pass through all subsequent data + callback(null, chunk); + return; + } + + // Buffer incoming data until we find the first newline + this.buffer += chunk.toString('utf8'); + + const newlineIndex = this.buffer.indexOf('\n'); + + if (newlineIndex === -1) { + // No newline yet, keep buffering + callback(); + return; + } + + // Found the newline, skip everything up to and including it + const remainingData = this.buffer.slice(newlineIndex + 1); + this.firstLineSkipped = true; + this.buffer = ''; // Clear buffer to free memory + + callback(null, Buffer.from(remainingData, 'utf8')); + } - const body = await httpApi.request({ - url: conn.normalizeUrl(url), + public _flush(callback: TransformCallback): void { + // If we reach the end without finding a newline, clear buffer and finish + this.buffer = ''; + callback(); + } +} + +async function bulkRequest( + conn: Connection, + url: string +): Promise<{ stream: Readable; headers: HttpResponse['headers'] }> { + // Bypass jsforce entirely and use undici fetch to avoid any buffering. + // jsforce's Transport.httpRequest() adds a 'complete' listener which triggers readAll() buffering. + // Using undici fetch directly gives us the raw response stream without any intermediate buffering. + + const normalizedUrl = conn.normalizeUrl(url); + + // Prepare request headers with authorization + const headers: { [name: string]: string } = { + 'content-Type': 'text/csv', + }; + + if (conn.accessToken) { + headers.Authorization = `Bearer ${conn.accessToken}`; + } + + const response = await fetch(normalizedUrl, { method: 'GET', + headers, }); - if (!headers) throw new Error('failed to get HTTP headers for bulk query'); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } - return { - body, - headers, - }; + if (!response.body) { + throw new Error('No body was returned'); + } + const stream = Readable.fromWeb(response.body); + + // Extract headers in the format jsforce expects + const responseHeaders: HttpResponse['headers'] = {}; + response.headers.forEach((value: string, key: string) => { + responseHeaders[key] = value; + }); + + return { stream, headers: responseHeaders }; } export async function exportRecords( @@ -124,6 +187,9 @@ export async function exportRecords( let recordsWritten = 0; + // refresh here because `bulkRequest` uses undici for fetching results. + await conn.refreshAuth(); + while (locator !== 'null') { // we can't parallelize this because we: // 1. need to get 1 batch to know the locator for the next one @@ -151,7 +217,7 @@ export async function exportRecords( // eslint-disable-next-line no-await-in-loop await pipeline( - Readable.from(res.body), + res.stream, new csvParse({ columns: true, delimiter: ColumnDelimiter[outputInfo.columnDelimiter] }), new Transform({ objectMode: true, @@ -173,18 +239,15 @@ export async function exportRecords( await pipeline( locator ? [ - // Skip the 1st row (CSV header) by finding the index of the first `LF` - // occurence and move the position 1 char ahead. - // - // CSVs using `CRLF` are still handled correctly because `CR` and `LF` are different chars in the string. - Readable.from(res.body.slice(res.body.indexOf('\n') + 1)), + res.stream, + new SkipFirstLineTransform(), fs.createWriteStream(outputInfo.filePath, { // Open file for appending. The file is created if it does not exist. // https://nodejs.org/api/fs.html#file-system-flags flags: 'a', // append mode }), ] - : [Readable.from(res.body), fs.createWriteStream(outputInfo.filePath)] + : [res.stream, fs.createWriteStream(outputInfo.filePath)] ); } diff --git a/test/bulkUtils.test.ts b/test/bulkUtils.test.ts index 820bde3f..7457cf59 100644 --- a/test/bulkUtils.test.ts +++ b/test/bulkUtils.test.ts @@ -14,9 +14,12 @@ * limitations under the License. */ +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; + import { expect } from 'chai'; -import { detectDelimiter } from '../src/bulkUtils.js'; +import { detectDelimiter, SkipFirstLineTransform } from '../src/bulkUtils.js'; describe('bulkUtils', () => { describe('csv', () => { @@ -31,4 +34,104 @@ describe('bulkUtils', () => { expect(await detectDelimiter('./test/test-files/csv/tab.csv')).to.equal('TAB'); }); }); + + describe.only('SkipFirstLineTransform', () => { + async function streamToString(readable: Readable): Promise { + const chunks: Buffer[] = []; + for await (const chunk of readable) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string)); + } + return Buffer.concat(chunks).toString('utf8'); + } + + it('skips first line with LF endings', async () => { + const input = 'Header1,Header2,Header3\nRow1Col1,Row1Col2,Row1Col3\nRow2Col1,Row2Col2,Row2Col3\n'; + const expected = 'Row1Col1,Row1Col2,Row1Col3\nRow2Col1,Row2Col2,Row2Col3\n'; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('skips first line with CRLF endings', async () => { + const input = 'Header1,Header2,Header3\r\nRow1Col1,Row1Col2,Row1Col3\r\nRow2Col1,Row2Col2,Row2Col3\r\n'; + const expected = 'Row1Col1,Row1Col2,Row1Col3\r\nRow2Col1,Row2Col2,Row2Col3\r\n'; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles header split across chunks', async () => { + // Simulate a stream where the header is split across multiple chunks + const chunk1 = 'Header1,Head'; + const chunk2 = 'er2,Header3\nRow1Col1,Row1Col2,Row1Col3\n'; + const expected = 'Row1Col1,Row1Col2,Row1Col3\n'; + + const input = Readable.from([chunk1, chunk2]); + const result = await streamToString(input.pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles empty stream after header', async () => { + const input = 'Header1,Header2,Header3\n'; + const expected = ''; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles single-line input without newline', async () => { + // Edge case: header with no newline at all + const input = 'Header1,Header2,Header3'; + const expected = ''; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles multi-byte UTF-8 characters in header', async () => { + const input = 'Header1,Hëàdér2,Header3\nRow1Col1,Row1Col2,Row1Col3\n'; + const expected = 'Row1Col1,Row1Col2,Row1Col3\n'; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('handles very long header line', async () => { + // Create a header with many columns + const headerCols = Array.from({ length: 100 }, (_, i) => `Header${i}`).join(','); + const dataCols = Array.from({ length: 100 }, (_, i) => `Data${i}`).join(','); + const input = `${headerCols}\n${dataCols}\n`; + const expected = `${dataCols}\n`; + + const result = await streamToString(Readable.from(input).pipe(new SkipFirstLineTransform())); + + expect(result).to.equal(expected); + }); + + it('passes through data correctly in pipeline', async () => { + const input = 'Id,Name,Email\n1,John,john@example.com\n2,Jane,jane@example.com\n'; + const expected = '1,John,john@example.com\n2,Jane,jane@example.com\n'; + + const chunks: string[] = []; + await pipeline( + Readable.from(input), + new SkipFirstLineTransform(), + async function* (source: AsyncIterable) { + for await (const chunk of source) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string); + chunks.push(buffer.toString('utf8')); + yield chunk; + } + } + ); + + expect(chunks.join('')).to.equal(expected); + }); + }); }); diff --git a/yarn.lock b/yarn.lock index 3221260a..454eedd2 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7553,6 +7553,11 @@ undici-types@~6.19.2: resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-6.19.6.tgz#e218c3df0987f4c0e0008ca00d6b6472d9b89b36" integrity sha512-e/vggGopEfTKSvj4ihnOLTsqhrKRN3LeO6qSN/GxohhuRv8qH9bNQ4B8W7e/vFL+0XTnmHPB4/kegunZGA4Org== +undici@^7.22.0: + version "7.22.0" + resolved "https://registry.yarnpkg.com/undici/-/undici-7.22.0.tgz#7a82590a5908e504a47d85c60b0f89ca14240e60" + integrity sha512-RqslV2Us5BrllB+JeiZnK4peryVTndy9Dnqq62S3yYRRTj0tFQCwEniUy2167skdGOy3vqRzEvl1Dm4sV2ReDg== + unicorn-magic@^0.3.0: version "0.3.0" resolved "https://registry.yarnpkg.com/unicorn-magic/-/unicorn-magic-0.3.0.tgz#4efd45c85a69e0dd576d25532fbfa22aa5c8a104" From 99ad56d29b54d8fd4831f84d41b3a601c35092b4 Mon Sep 17 00:00:00 2001 From: Cristian Dominguez Date: Tue, 24 Feb 2026 12:10:21 -0300 Subject: [PATCH 2/5] chore: buffer optimizations --- src/bulkUtils.ts | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/bulkUtils.ts b/src/bulkUtils.ts index b7b196f5..e8ca829b 100644 --- a/src/bulkUtils.ts +++ b/src/bulkUtils.ts @@ -78,10 +78,12 @@ export type ColumnDelimiterKeys = keyof typeof ColumnDelimiter; /** * Transform stream that skips the first line of CSV data (the header row). * Used when processing subsequent bulk result pages to avoid duplicate headers. + * + * Optimized to work directly with Buffers without string conversion for better memory efficiency. */ export class SkipFirstLineTransform extends Transform { private firstLineSkipped = false; - private buffer = ''; + private buffer: Buffer = Buffer.alloc(0); public constructor() { super(); @@ -95,9 +97,11 @@ export class SkipFirstLineTransform extends Transform { } // Buffer incoming data until we find the first newline - this.buffer += chunk.toString('utf8'); + // Work directly with Buffers to avoid string conversion overhead + this.buffer = Buffer.concat([this.buffer, chunk]); - const newlineIndex = this.buffer.indexOf('\n'); + // Find newline byte (0x0A for \n) + const newlineIndex = this.buffer.indexOf(0x0a); if (newlineIndex === -1) { // No newline yet, keep buffering @@ -106,16 +110,16 @@ export class SkipFirstLineTransform extends Transform { } // Found the newline, skip everything up to and including it - const remainingData = this.buffer.slice(newlineIndex + 1); + const remainingData = this.buffer.subarray(newlineIndex + 1); this.firstLineSkipped = true; - this.buffer = ''; // Clear buffer to free memory + this.buffer = Buffer.alloc(0); // Clear buffer to free memory - callback(null, Buffer.from(remainingData, 'utf8')); + callback(null, remainingData); } public _flush(callback: TransformCallback): void { // If we reach the end without finding a newline, clear buffer and finish - this.buffer = ''; + this.buffer = Buffer.alloc(0); callback(); } } From cc6827a4718b66f59287298e629951a478663cef Mon Sep 17 00:00:00 2001 From: svc-cli-bot Date: Tue, 24 Feb 2026 15:26:28 +0000 Subject: [PATCH 3/5] chore(release): 4.0.74-dev.0 [skip ci] --- README.md | 48 ++++++++++++++++++++++++------------------------ package.json | 2 +- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 0ac58358..3809b695 100644 --- a/README.md +++ b/README.md @@ -144,7 +144,7 @@ EXAMPLES $ sf data bulk results --job-id 7507i000fake341G --target-org my-scratch ``` -_See code: [src/commands/data/bulk/results.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/bulk/results.ts)_ +_See code: [src/commands/data/bulk/results.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/bulk/results.ts)_ ## `sf data create file` @@ -193,7 +193,7 @@ EXAMPLES $ sf data create file --file path/to/astro.png --parent-id a03fakeLoJWPIA3 ``` -_See code: [src/commands/data/create/file.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/create/file.ts)_ +_See code: [src/commands/data/create/file.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/create/file.ts)_ ## `sf data create record` @@ -249,7 +249,7 @@ EXAMPLES TracedEntityId=01p17000000R6bLAAS" ``` -_See code: [src/commands/data/create/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/create/record.ts)_ +_See code: [src/commands/data/create/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/create/record.ts)_ ## `sf data delete bulk` @@ -308,7 +308,7 @@ FLAG DESCRIPTIONS and can be enabled only by a system administrator. ``` -_See code: [src/commands/data/delete/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/bulk.ts)_ +_See code: [src/commands/data/delete/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/delete/bulk.ts)_ ## `sf data delete record` @@ -369,7 +369,7 @@ EXAMPLES $ sf data delete record --use-tooling-api --sobject TraceFlag --record-id 7tf8c ``` -_See code: [src/commands/data/delete/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/record.ts)_ +_See code: [src/commands/data/delete/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/delete/record.ts)_ ## `sf data delete resume` @@ -408,7 +408,7 @@ EXAMPLES $ sf data delete resume --use-most-recent --target-org my-scratch ``` -_See code: [src/commands/data/delete/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/resume.ts)_ +_See code: [src/commands/data/delete/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/delete/resume.ts)_ ## `sf data export bulk` @@ -475,7 +475,7 @@ EXAMPLES --result-format json --wait 10 --all-rows ``` -_See code: [src/commands/data/export/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/bulk.ts)_ +_See code: [src/commands/data/export/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/export/bulk.ts)_ ## `sf data export resume` @@ -514,7 +514,7 @@ EXAMPLES $ sf data export resume --use-most-recent ``` -_See code: [src/commands/data/export/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/resume.ts)_ +_See code: [src/commands/data/export/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/export/resume.ts)_ ## `sf data export tree` @@ -574,7 +574,7 @@ EXAMPLES my-scratch ``` -_See code: [src/commands/data/export/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/tree.ts)_ +_See code: [src/commands/data/export/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/export/tree.ts)_ ## `sf data get record` @@ -638,7 +638,7 @@ EXAMPLES $ sf data get record --use-tooling-api --sobject TraceFlag --record-id 7tf8c ``` -_See code: [src/commands/data/get/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/get/record.ts)_ +_See code: [src/commands/data/get/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/get/record.ts)_ ## `sf data import bulk` @@ -690,7 +690,7 @@ EXAMPLES $ sf data import bulk --file accounts.csv --sobject Account --wait 10 --target-org my-scratch ``` -_See code: [src/commands/data/import/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/bulk.ts)_ +_See code: [src/commands/data/import/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/import/bulk.ts)_ ## `sf data import resume` @@ -726,7 +726,7 @@ EXAMPLES $ sf data import resume --use-most-recent --target-org my-scratch ``` -_See code: [src/commands/data/import/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/resume.ts)_ +_See code: [src/commands/data/import/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/import/resume.ts)_ ## `sf data import tree` @@ -790,7 +790,7 @@ FLAG DESCRIPTIONS - files(array) - Files: An array of files paths to load ``` -_See code: [src/commands/data/import/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/tree.ts)_ +_See code: [src/commands/data/import/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/import/tree.ts)_ ## `sf data query` @@ -843,7 +843,7 @@ EXAMPLES $ sf data query --query "SELECT Name FROM ApexTrigger" --use-tooling-api ``` -_See code: [src/commands/data/query.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/query.ts)_ +_See code: [src/commands/data/query.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/query.ts)_ ## `sf data resume` @@ -880,7 +880,7 @@ EXAMPLES $ sf data resume --job-id 750xx000000005sAAA --batch-id 751xx000000005nAAA ``` -_See code: [src/commands/data/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/resume.ts)_ +_See code: [src/commands/data/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/resume.ts)_ ## `sf data search` @@ -930,7 +930,7 @@ EXAMPLES $ sf data search --file query.txt --target-org my-scratch --result-format csv ``` -_See code: [src/commands/data/search.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/search.ts)_ +_See code: [src/commands/data/search.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/search.ts)_ ## `sf data update bulk` @@ -985,7 +985,7 @@ EXAMPLES $ sf data update bulk --file accounts.csv --sobject Account --wait 10 --target-org my-scratch ``` -_See code: [src/commands/data/update/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/bulk.ts)_ +_See code: [src/commands/data/update/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/update/bulk.ts)_ ## `sf data update record` @@ -1047,7 +1047,7 @@ EXAMPLES "ExpirationDate=2017-12-01T00:58:04.000+0000" ``` -_See code: [src/commands/data/update/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/record.ts)_ +_See code: [src/commands/data/update/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/update/record.ts)_ ## `sf data update resume` @@ -1086,7 +1086,7 @@ EXAMPLES $ sf data update resume --use-most-recent ``` -_See code: [src/commands/data/update/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/resume.ts)_ +_See code: [src/commands/data/update/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/update/resume.ts)_ ## `sf data upsert bulk` @@ -1142,7 +1142,7 @@ EXAMPLES my-scratch ``` -_See code: [src/commands/data/upsert/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/upsert/bulk.ts)_ +_See code: [src/commands/data/upsert/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/upsert/bulk.ts)_ ## `sf data upsert resume` @@ -1181,7 +1181,7 @@ EXAMPLES $ sf data upsert resume --use-most-recent --target-org my-scratch ``` -_See code: [src/commands/data/upsert/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/upsert/resume.ts)_ +_See code: [src/commands/data/upsert/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/data/upsert/resume.ts)_ ## `sf force data bulk delete` @@ -1228,7 +1228,7 @@ EXAMPLES $ sf force data bulk delete --sobject MyObject__c --file files/delete.csv --wait 5 --target-org my-scratch ``` -_See code: [src/commands/force/data/bulk/delete.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/delete.ts)_ +_See code: [src/commands/force/data/bulk/delete.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/force/data/bulk/delete.ts)_ ## `sf force data bulk status` @@ -1265,7 +1265,7 @@ EXAMPLES $ sf force data bulk status --job-id 750xx000000005sAAA --batch-id 751xx000000005nAAA --target-org my-scratch ``` -_See code: [src/commands/force/data/bulk/status.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/status.ts)_ +_See code: [src/commands/force/data/bulk/status.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/force/data/bulk/status.ts)_ ## `sf force data bulk upsert` @@ -1323,6 +1323,6 @@ EXAMPLES --target-org my-scratch ``` -_See code: [src/commands/force/data/bulk/upsert.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/upsert.ts)_ +_See code: [src/commands/force/data/bulk/upsert.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.74-dev.0/src/commands/force/data/bulk/upsert.ts)_ diff --git a/package.json b/package.json index 2c406d6b..617acab5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@salesforce/plugin-data", - "version": "4.0.73", + "version": "4.0.74-dev.0", "description": "Plugin for salesforce data commands", "author": "Salesforce", "homepage": "https://github.com/salesforcecli/plugin-data", From 538884e81e45c985ae50b58f696334cc066ec758 Mon Sep 17 00:00:00 2001 From: Cristian Dominguez Date: Mon, 2 Mar 2026 13:45:23 -0300 Subject: [PATCH 4/5] test: remove stream test case/only --- test/bulkUtils.test.ts | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/test/bulkUtils.test.ts b/test/bulkUtils.test.ts index 7457cf59..d53812a8 100644 --- a/test/bulkUtils.test.ts +++ b/test/bulkUtils.test.ts @@ -35,7 +35,7 @@ describe('bulkUtils', () => { }); }); - describe.only('SkipFirstLineTransform', () => { + describe('SkipFirstLineTransform', () => { async function streamToString(readable: Readable): Promise { const chunks: Buffer[] = []; for await (const chunk of readable) { @@ -62,18 +62,6 @@ describe('bulkUtils', () => { expect(result).to.equal(expected); }); - it('handles header split across chunks', async () => { - // Simulate a stream where the header is split across multiple chunks - const chunk1 = 'Header1,Head'; - const chunk2 = 'er2,Header3\nRow1Col1,Row1Col2,Row1Col3\n'; - const expected = 'Row1Col1,Row1Col2,Row1Col3\n'; - - const input = Readable.from([chunk1, chunk2]); - const result = await streamToString(input.pipe(new SkipFirstLineTransform())); - - expect(result).to.equal(expected); - }); - it('handles empty stream after header', async () => { const input = 'Header1,Header2,Header3\n'; const expected = ''; @@ -124,7 +112,7 @@ describe('bulkUtils', () => { new SkipFirstLineTransform(), async function* (source: AsyncIterable) { for await (const chunk of source) { - const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string); + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); chunks.push(buffer.toString('utf8')); yield chunk; } From 487e13c77f972a4a8c353b823a4112553c139764 Mon Sep 17 00:00:00 2001 From: Cristian Dominguez Date: Mon, 2 Mar 2026 13:46:41 -0300 Subject: [PATCH 5/5] chore: bulkRequest handles auth refresh --- src/bulkUtils.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bulkUtils.ts b/src/bulkUtils.ts index e8ca829b..8d407dcb 100644 --- a/src/bulkUtils.ts +++ b/src/bulkUtils.ts @@ -139,6 +139,9 @@ async function bulkRequest( 'content-Type': 'text/csv', }; + // ensure undici gets a valid access token + await conn.refreshAuth(); + if (conn.accessToken) { headers.Authorization = `Bearer ${conn.accessToken}`; } @@ -191,9 +194,6 @@ export async function exportRecords( let recordsWritten = 0; - // refresh here because `bulkRequest` uses undici for fetching results. - await conn.refreshAuth(); - while (locator !== 'null') { // we can't parallelize this because we: // 1. need to get 1 batch to know the locator for the next one