Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ EXAMPLES
$ sf data bulk results --job-id 7507i000fake341G --target-org my-scratch
```

_See code: [src/commands/data/bulk/results.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/bulk/results.ts)_
_See code: [src/commands/data/bulk/results.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/bulk/results.ts)_

## `sf data create file`

Expand Down Expand Up @@ -193,7 +193,7 @@ EXAMPLES
$ sf data create file --file path/to/astro.png --parent-id a03fakeLoJWPIA3
```

_See code: [src/commands/data/create/file.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/create/file.ts)_
_See code: [src/commands/data/create/file.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/create/file.ts)_

## `sf data create record`

Expand Down Expand Up @@ -249,7 +249,7 @@ EXAMPLES
TracedEntityId=01p17000000R6bLAAS"
```

_See code: [src/commands/data/create/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/create/record.ts)_
_See code: [src/commands/data/create/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/create/record.ts)_

## `sf data delete bulk`

Expand Down Expand Up @@ -308,7 +308,7 @@ FLAG DESCRIPTIONS
and can be enabled only by a system administrator.
```

_See code: [src/commands/data/delete/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/delete/bulk.ts)_
_See code: [src/commands/data/delete/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/bulk.ts)_

## `sf data delete record`

Expand Down Expand Up @@ -369,7 +369,7 @@ EXAMPLES
$ sf data delete record --use-tooling-api --sobject TraceFlag --record-id 7tf8c
```

_See code: [src/commands/data/delete/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/delete/record.ts)_
_See code: [src/commands/data/delete/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/record.ts)_

## `sf data delete resume`

Expand Down Expand Up @@ -408,7 +408,7 @@ EXAMPLES
$ sf data delete resume --use-most-recent --target-org my-scratch
```

_See code: [src/commands/data/delete/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/delete/resume.ts)_
_See code: [src/commands/data/delete/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/delete/resume.ts)_

## `sf data export bulk`

Expand Down Expand Up @@ -475,7 +475,7 @@ EXAMPLES
--result-format json --wait 10 --all-rows
```

_See code: [src/commands/data/export/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/export/bulk.ts)_
_See code: [src/commands/data/export/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/bulk.ts)_

## `sf data export resume`

Expand Down Expand Up @@ -514,7 +514,7 @@ EXAMPLES
$ sf data export resume --use-most-recent
```

_See code: [src/commands/data/export/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/export/resume.ts)_
_See code: [src/commands/data/export/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/resume.ts)_

## `sf data export tree`

Expand Down Expand Up @@ -574,7 +574,7 @@ EXAMPLES
my-scratch
```

_See code: [src/commands/data/export/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/export/tree.ts)_
_See code: [src/commands/data/export/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/export/tree.ts)_

## `sf data get record`

Expand Down Expand Up @@ -638,7 +638,7 @@ EXAMPLES
$ sf data get record --use-tooling-api --sobject TraceFlag --record-id 7tf8c
```

_See code: [src/commands/data/get/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/get/record.ts)_
_See code: [src/commands/data/get/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/get/record.ts)_

## `sf data import bulk`

Expand Down Expand Up @@ -690,7 +690,7 @@ EXAMPLES
$ sf data import bulk --file accounts.csv --sobject Account --wait 10 --target-org my-scratch
```

_See code: [src/commands/data/import/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/import/bulk.ts)_
_See code: [src/commands/data/import/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/bulk.ts)_

## `sf data import resume`

Expand Down Expand Up @@ -726,7 +726,7 @@ EXAMPLES
$ sf data import resume --use-most-recent --target-org my-scratch
```

_See code: [src/commands/data/import/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/import/resume.ts)_
_See code: [src/commands/data/import/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/resume.ts)_

## `sf data import tree`

Expand Down Expand Up @@ -790,7 +790,7 @@ FLAG DESCRIPTIONS
- files(array) - Files: An array of files paths to load
```

_See code: [src/commands/data/import/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/import/tree.ts)_
_See code: [src/commands/data/import/tree.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/import/tree.ts)_

## `sf data query`

Expand Down Expand Up @@ -843,7 +843,7 @@ EXAMPLES
$ sf data query --query "SELECT Name FROM ApexTrigger" --use-tooling-api
```

_See code: [src/commands/data/query.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/query.ts)_
_See code: [src/commands/data/query.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/query.ts)_

## `sf data resume`

Expand Down Expand Up @@ -880,7 +880,7 @@ EXAMPLES
$ sf data resume --job-id 750xx000000005sAAA --batch-id 751xx000000005nAAA
```

_See code: [src/commands/data/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/resume.ts)_
_See code: [src/commands/data/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/resume.ts)_

## `sf data search`

Expand Down Expand Up @@ -930,7 +930,7 @@ EXAMPLES
$ sf data search --file query.txt --target-org my-scratch --result-format csv
```

_See code: [src/commands/data/search.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/search.ts)_
_See code: [src/commands/data/search.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/search.ts)_

## `sf data update bulk`

Expand Down Expand Up @@ -985,7 +985,7 @@ EXAMPLES
$ sf data update bulk --file accounts.csv --sobject Account --wait 10 --target-org my-scratch
```

_See code: [src/commands/data/update/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/update/bulk.ts)_
_See code: [src/commands/data/update/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/bulk.ts)_

## `sf data update record`

Expand Down Expand Up @@ -1047,7 +1047,7 @@ EXAMPLES
"ExpirationDate=2017-12-01T00:58:04.000+0000"
```

_See code: [src/commands/data/update/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/update/record.ts)_
_See code: [src/commands/data/update/record.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/record.ts)_

## `sf data update resume`

Expand Down Expand Up @@ -1086,7 +1086,7 @@ EXAMPLES
$ sf data update resume --use-most-recent
```

_See code: [src/commands/data/update/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/update/resume.ts)_
_See code: [src/commands/data/update/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/update/resume.ts)_

## `sf data upsert bulk`

Expand Down Expand Up @@ -1142,7 +1142,7 @@ EXAMPLES
my-scratch
```

_See code: [src/commands/data/upsert/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/upsert/bulk.ts)_
_See code: [src/commands/data/upsert/bulk.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/upsert/bulk.ts)_

## `sf data upsert resume`

Expand Down Expand Up @@ -1181,7 +1181,7 @@ EXAMPLES
$ sf data upsert resume --use-most-recent --target-org my-scratch
```

_See code: [src/commands/data/upsert/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/data/upsert/resume.ts)_
_See code: [src/commands/data/upsert/resume.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/data/upsert/resume.ts)_

## `sf force data bulk delete`

Expand Down Expand Up @@ -1228,7 +1228,7 @@ EXAMPLES
$ sf force data bulk delete --sobject MyObject__c --file files/delete.csv --wait 5 --target-org my-scratch
```

_See code: [src/commands/force/data/bulk/delete.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/force/data/bulk/delete.ts)_
_See code: [src/commands/force/data/bulk/delete.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/delete.ts)_

## `sf force data bulk status`

Expand Down Expand Up @@ -1265,7 +1265,7 @@ EXAMPLES
$ sf force data bulk status --job-id 750xx000000005sAAA --batch-id 751xx000000005nAAA --target-org my-scratch
```

_See code: [src/commands/force/data/bulk/status.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/force/data/bulk/status.ts)_
_See code: [src/commands/force/data/bulk/status.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/status.ts)_

## `sf force data bulk upsert`

Expand Down Expand Up @@ -1323,6 +1323,6 @@ EXAMPLES
--target-org my-scratch
```

_See code: [src/commands/force/data/bulk/upsert.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.75/src/commands/force/data/bulk/upsert.ts)_
_See code: [src/commands/force/data/bulk/upsert.ts](https://github.com/salesforcecli/plugin-data/blob/4.0.73/src/commands/force/data/bulk/upsert.ts)_

<!-- commandsstop -->
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@salesforce/plugin-data",
"version": "4.0.75",
"version": "4.0.74-dev.0",
"description": "Plugin for salesforce data commands",
"author": "Salesforce",
"homepage": "https://github.com/salesforcecli/plugin-data",
Expand Down Expand Up @@ -135,6 +135,7 @@
"csv-stringify": "^6.6.0",
"form-data": "^4.0.5",
"terminal-link": "^3.0.0",
"undici": "^7.22.0",
"zod": "^4.3.6"
},
"devDependencies": {
Expand Down
115 changes: 91 additions & 24 deletions src/bulkUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

import { Transform, Readable } from 'node:stream';
import { Transform, Readable, TransformCallback } from 'node:stream';
import { createInterface } from 'node:readline';
import { pipeline } from 'node:stream/promises';
import * as fs from 'node:fs';
import { EOL } from 'node:os';
import { HttpApi } from '@jsforce/jsforce-node/lib/http-api.js';
import { fetch } from 'undici';
import { HttpResponse } from '@jsforce/jsforce-node';
import {
IngestJobV2Results,
Expand Down Expand Up @@ -75,28 +75,98 @@ export enum ColumnDelimiter {

export type ColumnDelimiterKeys = keyof typeof ColumnDelimiter;

async function bulkRequest(conn: Connection, url: string): Promise<{ body: string; headers: HttpResponse['headers'] }> {
const httpApi = new HttpApi(conn, {
responseType: 'text/plain', // this ensures jsforce doesn't try parsing the body
});
/**
* Transform stream that skips the first line of CSV data (the header row).
* Used when processing subsequent bulk result pages to avoid duplicate headers.
*
* Optimized to work directly with Buffers without string conversion for better memory efficiency.
*/
export class SkipFirstLineTransform extends Transform {
private firstLineSkipped = false;
private buffer: Buffer = Buffer.alloc(0);

public constructor() {
super();
}

public _transform(chunk: Buffer, _encoding: BufferEncoding, callback: TransformCallback): void {
if (this.firstLineSkipped) {
// After first line is skipped, pass through all subsequent data
callback(null, chunk);
return;
}

let headers: HttpResponse['headers'] | undefined;
// Buffer incoming data until we find the first newline
// Work directly with Buffers to avoid string conversion overhead
this.buffer = Buffer.concat([this.buffer, chunk]);

httpApi.on('response', (response: HttpResponse) => {
headers = response.headers;
});
// Find newline byte (0x0A for \n)
const newlineIndex = this.buffer.indexOf(0x0a);

if (newlineIndex === -1) {
// No newline yet, keep buffering
callback();
return;
}

// Found the newline, skip everything up to and including it
const remainingData = this.buffer.subarray(newlineIndex + 1);
this.firstLineSkipped = true;
this.buffer = Buffer.alloc(0); // Clear buffer to free memory

callback(null, remainingData);
}

public _flush(callback: TransformCallback): void {
// If we reach the end without finding a newline, clear buffer and finish
this.buffer = Buffer.alloc(0);
callback();
}
}

const body = await httpApi.request<string>({
url: conn.normalizeUrl(url),
async function bulkRequest(
conn: Connection,
url: string
): Promise<{ stream: Readable; headers: HttpResponse['headers'] }> {
// Bypass jsforce entirely and use undici fetch to avoid any buffering.
// jsforce's Transport.httpRequest() adds a 'complete' listener which triggers readAll() buffering.
// Using undici fetch directly gives us the raw response stream without any intermediate buffering.

const normalizedUrl = conn.normalizeUrl(url);

// Prepare request headers with authorization
const headers: { [name: string]: string } = {
'content-Type': 'text/csv',
};

// ensure undici gets a valid access token
await conn.refreshAuth();

if (conn.accessToken) {
headers.Authorization = `Bearer ${conn.accessToken}`;
}

const response = await fetch(normalizedUrl, {
method: 'GET',
headers,
});

if (!headers) throw new Error('failed to get HTTP headers for bulk query');
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

return {
body,
headers,
};
if (!response.body) {
throw new Error('No body was returned');
}
const stream = Readable.fromWeb(response.body);

// Extract headers in the format jsforce expects
const responseHeaders: HttpResponse['headers'] = {};
response.headers.forEach((value: string, key: string) => {
responseHeaders[key] = value;
});

return { stream, headers: responseHeaders };
}

export async function exportRecords(
Expand Down Expand Up @@ -151,7 +221,7 @@ export async function exportRecords(

// eslint-disable-next-line no-await-in-loop
await pipeline(
Readable.from(res.body),
res.stream,
new csvParse({ columns: true, delimiter: ColumnDelimiter[outputInfo.columnDelimiter] }),
new Transform({
objectMode: true,
Expand All @@ -173,18 +243,15 @@ export async function exportRecords(
await pipeline(
locator
? [
// Skip the 1st row (CSV header) by finding the index of the first `LF`
// occurence and move the position 1 char ahead.
//
// CSVs using `CRLF` are still handled correctly because `CR` and `LF` are different chars in the string.
Readable.from(res.body.slice(res.body.indexOf('\n') + 1)),
res.stream,
new SkipFirstLineTransform(),
fs.createWriteStream(outputInfo.filePath, {
// Open file for appending. The file is created if it does not exist.
// https://nodejs.org/api/fs.html#file-system-flags
flags: 'a', // append mode
}),
]
: [Readable.from(res.body), fs.createWriteStream(outputInfo.filePath)]
: [res.stream, fs.createWriteStream(outputInfo.filePath)]
);
}

Expand Down
Loading
Loading