The Bulk API v1 enables you to process large volumes of records asynchronously in Salesforce. It uses a job-based architecture where you create a job, add batches of records, and poll for results.
Overview
Bulk API v1 is ideal for:
Loading or deleting large data sets (thousands to millions of records)
Querying large data sets asynchronously
Operating on data when user interaction is not required
The API uses a three-step process:
Create a job - Define the object and operation type
Add batches - Upload data in CSV format
Close job and poll - Mark job complete and wait for results
Quick Start
Simple Bulk Insert
const accounts = [
{ Name: 'Account 1' , Industry: 'Technology' },
{ Name: 'Account 2' , Industry: 'Finance' },
{ Name: 'Account 3' , Industry: 'Healthcare' }
];
const results = await conn . bulk . load ( 'Account' , 'insert' , accounts );
console . log ( results );
// [
// { id: '001...', success: true, created: true, errors: [] },
// { id: '001...', success: true, created: true, errors: [] },
// { id: '001...', success: true, created: true, errors: [] }
// ]
Streaming from CSV File
const fs = require ( 'fs' );
const csvFile = fs . createReadStream ( 'accounts.csv' );
const batch = conn . bulk . load ( 'Account' , 'insert' );
batch . on ( 'response' , results => {
console . log ( 'Number of records processed:' , results . length );
});
csvFile . pipe ( batch . stream ());
Supported Operations
The type of operation to perform on the records
insert - Create new records
update - Update existing records by Id
upsert - Insert or update records based on external ID
delete - Soft delete records
hardDelete - Permanently delete records
query - Query records asynchronously
queryAll - Query including deleted and archived records
Bulk Class
The main entry point for Bulk API v1 operations.
Properties
Polling interval in milliseconds. Controls how frequently the API checks job status.
Polling timeout in milliseconds. Maximum time to wait for job completion.
load()
Create and execute a bulk load operation.
Array of Records
CSV String
Stream
With Options
const records = [
{ Name: 'Company A' , Industry: 'Tech' },
{ Name: 'Company B' , Industry: 'Finance' }
];
const results = await conn . bulk . load ( 'Account' , 'insert' , records );
Parameters:
The Salesforce object type (e.g., ‘Account’, ‘Contact’)
Optional configuration for the bulk operation External ID field name for upsert operations
How batches should be processed
Assignment rule ID to use for Case or Lead objects
input
Record[] | Readable | string
The data to load (array of records, CSV string, or readable stream)
Returns: Batch<S, Opr> - A Batch instance (also a Promise)
query()
Execute a bulk query and get results as a record stream.
const soql = 'SELECT Id, Name, Industry FROM Account WHERE CreatedDate = TODAY' ;
const recordStream = await conn . bulk . query ( soql );
recordStream . stream ( 'csv' ). pipe ( process . stdout );
Parameters:
The SOQL query to execute
Returns: Promise<Parsable<Record>> - A record stream with query results
createJob()
Create a new job instance for manual batch management.
const job = conn . bulk . createJob ( 'Account' , 'insert' );
await job . open ();
const batch = job . createBatch ();
batch . execute ( records );
batch . on ( 'queue' , () => {
console . log ( 'Batch queued:' , batch . id );
});
await job . close ();
Parameters:
The Salesforce object type
Job configuration options
Returns: Job<S, Opr> - A new job instance
job()
Get an existing job by ID.
const job = conn . bulk . job ( '750...' );
const jobInfo = await job . check ();
console . log ( jobInfo . state );
Parameters:
Returns: Job<S, Opr> - Job instance for the specified ID
Job Class
Represents a bulk API job.
Properties
The job ID assigned by Salesforce
Current job state: ‘Open’, ‘Closed’, ‘Aborted’, ‘Failed’, ‘Unknown’, or ‘NotProcessed’
The Salesforce object type
The operation being performed
open()
Open a new job in Salesforce.
const job = conn . bulk . createJob ( 'Account' , 'update' );
const jobInfo = await job . open ();
console . log ( 'Job created:' , jobInfo . id );
Returns: Promise<JobInfo> - Information about the created job
createBatch()
Create a new batch within this job.
const batch1 = job . createBatch ();
batch1 . execute ( firstSetOfRecords );
const batch2 = job . createBatch ();
batch2 . execute ( secondSetOfRecords );
Returns: Batch<S, Opr> - A new batch instance
batch()
Get an existing batch by ID.
const batch = job . batch ( '751...' );
const batchInfo = await batch . check ();
console . log ( batchInfo . state );
Parameters:
Returns: Batch<S, Opr> - Batch instance
check()
Check the latest job status from Salesforce.
const jobInfo = await job . check ();
console . log ( 'Batches completed:' , jobInfo . numberBatchesCompleted );
console . log ( 'Records processed:' , jobInfo . numberRecordsProcessed );
Returns: Promise<JobInfo> - Updated job information
list()
List all batches in this job.
const batches = await job . list ();
batches . forEach ( batch => {
console . log ( `Batch ${ batch . id } : ${ batch . state } ` );
});
Returns: Promise<BatchInfo[]> - Array of batch information
close()
Close the job to indicate no more batches will be added.
Returns: Promise<JobInfo> - Updated job information
abort()
Abort the job and all its batches.
Returns: Promise<JobInfo> - Updated job information
Batch Class
Represents a batch within a job. Extends Node.js Writable stream.
Properties
The batch ID assigned by Salesforce
Reference to the parent job
execute()
Execute the batch with provided data.
const batch = job . createBatch ();
const results = await batch . execute ( records );
Parameters:
input
string | Record[] | Readable
Data to process (CSV string, array of records, or readable stream)
Returns: Batch<S, Opr> - Returns self for chaining
Aliases: run(), exec()
stream()
Get the duplex stream for streaming data in and results out.
const batch = job . createBatch ();
const batchStream = batch . stream ();
fs . createReadStream ( 'input.csv' )
. pipe ( batchStream )
. pipe ( fs . createWriteStream ( 'results.csv' ));
Returns: Duplex - Duplex stream for batch data
check()
Check the current batch status.
const batchInfo = await batch . check ();
console . log ( 'State:' , batchInfo . state );
console . log ( 'Records processed:' , batchInfo . numberRecordsProcessed );
Returns: Promise<BatchInfo> - Current batch information
poll()
Poll for batch completion.
batch . on ( 'inProgress' , info => {
console . log ( 'Still processing...' );
});
batch . poll ( 1000 , 60000 ); // Check every second for up to 60 seconds
Parameters:
Polling interval in milliseconds
Total timeout in milliseconds
If timeout is 0, a PollingTimeoutError is thrown immediately without polling.
retrieve()
Retrieve batch results.
const results = await batch . retrieve ();
results . forEach ( result => {
if ( result . success ) {
console . log ( 'Success:' , result . id );
} else {
console . log ( 'Error:' , result . errors );
}
});
Returns: Promise<BatchResult<Opr>> - Batch results
For ingest operations (insert, update, upsert, delete, hardDelete):
Record ID (null for failures)
Whether operation succeeded
Whether record was created
For query operations:
result()
Fetch query batch result as a record stream.
// For query operations only
const results = await batch . retrieve ();
const resultStream = batch . result ( results [ 0 ]. id );
resultStream . stream ( 'csv' ). pipe ( process . stdout );
Parameters:
The result ID from query batch results
Returns: Parsable<Record> - Record stream
Events
Both Job and Batch classes extend EventEmitter and emit various events.
Job Events
const job = conn . bulk . createJob ( 'Account' , 'insert' );
job . on ( 'open' , jobInfo => {
console . log ( 'Job opened:' , jobInfo . id );
});
job . on ( 'close' , jobInfo => {
console . log ( 'Job closed' );
});
job . on ( 'error' , err => {
console . error ( 'Job error:' , err );
});
open - Emitted when job is opened
close - Emitted when job is closed
abort - Emitted when job is aborted
error - Emitted on error
Batch Events
const batch = job . createBatch ();
batch . on ( 'queue' , batchInfo => {
console . log ( 'Batch queued:' , batchInfo . id );
});
batch . on ( 'inProgress' , batchInfo => {
console . log ( 'Processing:' , batchInfo . numberRecordsProcessed );
});
batch . on ( 'response' , results => {
console . log ( 'Results received:' , results . length );
});
batch . on ( 'error' , err => {
console . error ( 'Batch error:' , err );
});
queue - Emitted when batch is queued
inProgress - Emitted during polling while batch is processing
response - Emitted when results are retrieved
error - Emitted on error
Advanced Usage
Manual Job and Batch Management
const job = conn . bulk . createJob ( 'Account' , 'insert' , {
concurrencyMode: 'Serial'
});
await job . open ();
// Create multiple batches
const batch1 = job . createBatch ();
const batch2 = job . createBatch ();
batch1 . execute ( firstBatch );
batch2 . execute ( secondBatch );
// Wait for both batches
await Promise . all ([
batch1 . then ( results => console . log ( 'Batch 1 done:' , results . length )),
batch2 . then ( results => console . log ( 'Batch 2 done:' , results . length ))
]);
await job . close ();
Upsert with External ID
const contacts = [
{ Email__c: '[email protected] ' , FirstName: 'John' , LastName: 'Doe' },
{ Email__c: '[email protected] ' , FirstName: 'Jane' , LastName: 'Smith' }
];
const results = await conn . bulk . load (
'Contact' ,
'upsert' ,
{ extIdField: 'Email__c' },
contacts
);
Custom Polling Configuration
// Set global polling options
conn . bulk . pollInterval = 5000 ; // Check every 5 seconds
conn . bulk . pollTimeout = 300000 ; // Wait up to 5 minutes
const results = await conn . bulk . load ( 'Account' , 'insert' , largeDataSet );
Handling Large Result Sets
const job = conn . bulk . createJob ( 'Account' , 'query' );
await job . open ();
const batch = job . createBatch ();
const soql = 'SELECT Id, Name FROM Account' ;
batch . execute ( soql );
batch . on ( 'response' , async results => {
// Query can return multiple result sets
for ( const result of results ) {
const resultStream = batch . result ( result . id );
resultStream . on ( 'record' , record => {
// Process each record
console . log ( record . Name );
});
}
});
await job . close ();
Bulk API v1 uses CSV format with these specifications:
First line must contain field names
Fields separated by commas
Values with commas, quotes, or newlines must be enclosed in double quotes
Null values represented as #N/A
Boolean values as true or false strings
Name, Industry, NumberOfEmployees, Active__c
Acme Corp, Technology, 500, true
"Company, Inc.", Finance, 1000, false
Example LLC, Healthcare, #N/A, true
Error Handling
try {
const results = await conn . bulk . load ( 'Account' , 'insert' , records );
// Check for partial failures
const failed = results . filter ( r => ! r . success );
if ( failed . length > 0 ) {
console . log ( 'Failed records:' , failed . length );
failed . forEach ( r => {
console . log ( 'Errors:' , r . errors . join ( ', ' ));
});
}
} catch ( err ) {
if ( err . name === 'PollingTimeout' ) {
console . log ( 'Job timed out' );
console . log ( 'Job ID:' , err . jobId );
console . log ( 'Batch ID:' , err . batchId );
// Job may still be processing - check status later
} else {
console . error ( 'Bulk operation failed:' , err . message );
}
}
Limits and Considerations
Maximum 10,000 batches per job
Maximum 10 MB per batch (approximately 10,000 records)
Maximum 24 hours job processing time
Batch size is automatically split if over limits
Serial mode processes one batch at a time (slower but respects record locking)
Parallel mode processes multiple batches simultaneously (faster)
See Also