Bulk API v2 provides a simplified and more efficient way to process large data sets in Salesforce. It offers better performance, easier error handling, and a streamlined workflow compared to v1.
Overview
Bulk API v2 is recommended for:
- High-volume data loads (insert, update, upsert, delete)
- Large queries that would timeout with standard API
- Modern applications requiring simpler integration
- Operations where you need detailed success/failure reporting
Key Improvements over v1
- Simplified workflow - No batch management required
- Better performance - Optimized processing engine
- Improved error handling - Separate endpoints for successful, failed, and unprocessed records
- Single upload - All data uploaded in one operation
- Better monitoring - Enhanced job status information
Quick Start
Simple Data Load
const accounts = [
{ Name: 'Acme Corp', Industry: 'Technology', NumberOfEmployees: 500 },
{ Name: 'Global Inc', Industry: 'Finance', NumberOfEmployees: 1000 },
{ Name: 'Innovate LLC', Industry: 'Healthcare', NumberOfEmployees: 250 }
];
const results = await conn.bulk2.loadAndWaitForResults({
object: 'Account',
operation: 'insert',
input: accounts
});
console.log('Successful:', results.successfulResults.length);
console.log('Failed:', results.failedResults.length);
Bulk Query
const soql = 'SELECT Id, Name, Industry FROM Account WHERE CreatedDate = LAST_MONTH';
const recordStream = await conn.bulk2.query(soql);
recordStream.on('record', record => {
console.log(record.Name, record.Industry);
});
Supported Operations
The type of ingest operation to perform:
insert - Create new records
update - Update existing records by Id
upsert - Insert or update based on external ID field
delete - Soft delete records
hardDelete - Permanently delete records from recycle bin
For queries, use the query() method with operation type determined by options:
query - Standard query (default)
queryAll - Query including deleted and archived records
BulkV2 Class
The main entry point for Bulk API v2 operations.
Properties
Polling interval in milliseconds for checking job status.
Maximum time in milliseconds to wait for job completion.
loadAndWaitForResults()
Create, upload, and process a bulk ingest job in one operation.
const results = await conn.bulk2.loadAndWaitForResults({
object: 'Contact',
operation: 'upsert',
externalIdFieldName: 'Email__c',
input: contacts,
pollInterval: 5000,
pollTimeout: 120000
});
// Process results
for (const record of results.successfulResults) {
console.log('Created/Updated:', record.sf__Id);
}
for (const record of results.failedResults) {
console.log('Failed:', record.sf__Error);
}
Parameters:
The Salesforce object API name (e.g., ‘Account’, ‘Contact’)
input
Record[] | Readable | string
required
Data to process (array of records, CSV string, or readable stream)
External ID field for upsert operations
Assignment rule ID for Case or Lead objects
Override default polling interval
Override default polling timeout
Returns: Promise<IngestJobV2Results<S>>
Records that were successfully processed
Whether record was created
Records that failed processing
Records that were not processed (due to job abortion or other issues)
query()
Execute a bulk query and get results as a record stream.
const soql = 'SELECT Id, Name, CreatedDate FROM Account ORDER BY CreatedDate DESC';
const recordStream = await conn.bulk2.query(soql, {
scanAll: false, // Set true for queryAll
pollInterval: 2000,
pollTimeout: 60000
});
recordStream.on('record', record => {
console.log(record);
});
// Or convert to CSV
recordStream.stream('csv').pipe(fs.createWriteStream('output.csv'));
Parameters:
The SOQL query to execute
Query configuration optionsIf true, uses queryAll to include deleted and archived records
columnDelimiter
QueryJobInfoV2['columnDelimiter']
CSV delimiter for results
lineEnding
QueryJobInfoV2['lineEnding']
Line ending style for results
Override default polling interval
Override default polling timeout
Returns: Promise<Parsable<Record>> - A record stream
createJob()
Create an ingest job instance for manual control.
const job = conn.bulk2.createJob({
object: 'Account',
operation: 'insert'
});
await job.open();
await job.uploadData(records);
await job.close();
await job.poll();
const results = await job.getAllResults();
Parameters:
options
NewIngestJobOptions
required
Job configurationSalesforce object API name
External ID field for upsert
Returns: IngestJobV2<S> - An ingest job instance
job()
Get an existing job instance by ID.
const job = conn.bulk2.job('ingest', { id: '750...' });
const jobInfo = await job.check();
console.log(jobInfo.state);
Parameters:
Returns: IngestJobV2<S> or QueryJobV2<S>
IngestJobV2 Class
Represents a Bulk API v2 ingest job.
Properties
The job ID assigned by Salesforce
open()
Create the job in Salesforce.
const job = conn.bulk2.createJob({
object: 'Account',
operation: 'insert'
});
const jobInfo = await job.open();
console.log('Job ID:', jobInfo.id);
Returns: Promise<JobInfoV2> - Job information
Job state (Open, UploadComplete, InProgress, JobComplete, Aborted, Failed)
uploadData()
Upload data to the job.
const records = [
{ Name: 'Company A', Industry: 'Tech' },
{ Name: 'Company B', Industry: 'Finance' }
];
await job.uploadData(records);
Parameters:
input
string | Record[] | Readable
required
Data to upload
Returns: Promise<void>
Data can only be uploaded once per job. Multiple uploads will throw an error.
close()
Mark the job as ready for processing.
Returns: Promise<void>
You must close the job before it will be processed. After closing, you cannot upload more data.
poll()
Poll for job completion.
job.on('inProgress', info => {
console.log('Processed:', info.numberRecordsProcessed);
});
job.on('jobComplete', info => {
console.log('Job finished!');
});
try {
await job.poll(2000, 120000); // Check every 2s for up to 2 minutes
} catch (err) {
if (err.name === 'JobPollingTimeout') {
console.log('Job still processing, check back later');
}
}
Parameters:
Polling interval in milliseconds (defaults to instance pollInterval)
Polling timeout in milliseconds (defaults to instance pollTimeout)
Returns: Promise<void> - Resolves when job completes
check()
Check current job status.
const jobInfo = await job.check();
console.log('State:', jobInfo.state);
console.log('Processed:', jobInfo.numberRecordsProcessed);
console.log('Failed:', jobInfo.numberRecordsFailed);
Returns: Promise<JobInfoV2> - Current job information
getAllResults()
Retrieve all results (successful, failed, and unprocessed records).
const results = await job.getAllResults();
console.log('Success:', results.successfulResults.length);
console.log('Failed:', results.failedResults.length);
console.log('Unprocessed:', results.unprocessedRecords.length);
// Process failed records
for (const failed of results.failedResults) {
console.log(`Record failed: ${failed.sf__Error}`);
console.log('Original data:', failed);
}
Returns: Promise<IngestJobV2Results<S>>
getSuccessfulResults()
Get only successful results.
const successful = await job.getSuccessfulResults();
successful.forEach(record => {
console.log('ID:', record.sf__Id);
console.log('Created:', record.sf__Created === 'true');
});
Parameters:
If true, returns raw CSV string instead of parsed records
Returns: Promise<IngestJobV2SuccessfulResults<S>> or Promise<string>
getFailedResults()
Get only failed results.
const failed = await job.getFailedResults();
failed.forEach(record => {
console.log('Error:', record.sf__Error);
console.log('Original data:', record);
});
Parameters:
If true, returns raw CSV string
Returns: Promise<IngestJobV2FailedResults<S>> or Promise<string>
getUnprocessedRecords()
Get unprocessed records (usually due to job abortion).
const unprocessed = await job.getUnprocessedRecords();
if (Array.isArray(unprocessed)) {
console.log('Unprocessed count:', unprocessed.length);
} else {
console.log('Unprocessed CSV:', unprocessed);
}
Parameters:
If true, returns raw CSV string
Returns: Promise<IngestJobV2UnprocessedRecords<S>>
abort()
Abort the job.
Returns: Promise<void>
delete()
Delete the job from Salesforce.
Returns: Promise<void>
getInfo()
Get cached job information.
const jobInfo = job.getInfo();
console.log('Cached state:', jobInfo.state);
Returns: JobInfoV2 - Cached job info
Throws error if no info is cached. Call await job.check() first.
QueryJobV2 Class
Represents a Bulk API v2 query job.
Properties
open()
Create the query job in Salesforce.
const queryJob = new QueryJobV2(conn, {
bodyParams: {
query: 'SELECT Id, Name FROM Account',
operation: 'query'
},
pollingOptions: { pollInterval: 1000, pollTimeout: 30000 }
});
await queryJob.open();
Returns: Promise<QueryJobInfoV2>
poll()
Wait for query to complete.
Parameters:
Polling interval in milliseconds
Polling timeout in milliseconds
Returns: Promise<void>
result()
Get query results as a record stream.
const resultStream = await queryJob.result();
resultStream.on('record', record => {
console.log(record);
});
Returns: Promise<Parsable<Record>> - Record stream with query results
check()
Check query job status.
const jobInfo = await queryJob.check();
console.log('State:', jobInfo.state);
Returns: Promise<QueryJobInfoV2>
abort()
Abort the query job.
Returns: Promise<QueryJobInfoV2>
delete()
Delete the query job.
Returns: Promise<void>
getInfo()
Get cached query job information.
const jobInfo = queryJob.getInfo();
Returns: QueryJobInfoV2
Events
Both IngestJobV2 and QueryJobV2 extend EventEmitter.
IngestJobV2 Events
const job = conn.bulk2.createJob({
object: 'Account',
operation: 'insert'
});
job.on('open', () => {
console.log('Job opened:', job.id);
});
job.on('close', () => {
console.log('Job closed and ready for processing');
});
job.on('inProgress', jobInfo => {
console.log('Processing...', jobInfo.numberRecordsProcessed);
});
job.on('jobComplete', jobInfo => {
console.log('Job completed!');
});
job.on('aborted', () => {
console.log('Job aborted');
});
job.on('error', err => {
console.error('Job error:', err);
});
QueryJobV2 Events
queryJob.on('open', jobInfo => {
console.log('Query job opened:', jobInfo.id);
});
queryJob.on('inProgress', jobInfo => {
console.log('Query in progress...');
});
queryJob.on('jobComplete', jobInfo => {
console.log('Query completed!');
});
queryJob.on('error', err => {
console.error('Query error:', err);
});
Advanced Usage
Manual Job Control
const job = conn.bulk2.createJob({
object: 'Contact',
operation: 'upsert',
externalIdFieldName: 'Email__c'
});
try {
// Step 1: Open the job
await job.open();
console.log('Job created:', job.id);
// Step 2: Upload data
await job.uploadData(contacts);
console.log('Data uploaded');
// Step 3: Close job to start processing
await job.close();
console.log('Job closed, processing started');
// Step 4: Poll for completion
await job.poll();
console.log('Job completed');
// Step 5: Get results
const results = await job.getAllResults();
console.log('Success:', results.successfulResults.length);
console.log('Failed:', results.failedResults.length);
} catch (err) {
console.error('Job failed:', err);
// Clean up
try {
await job.abort();
await job.delete();
} catch (cleanupErr) {
// Ignore cleanup errors
}
}
Streaming Large Files
const fs = require('fs');
const csvStream = fs.createReadStream('large-dataset.csv');
const job = conn.bulk2.createJob({
object: 'Account',
operation: 'insert'
});
await job.open();
await job.uploadData(csvStream);
await job.close();
// Monitor progress
job.on('inProgress', info => {
const progress = (info.numberRecordsProcessed / info.numberRecordsProcessed) * 100;
console.log(`Progress: ${progress.toFixed(1)}%`);
});
await job.poll(5000, 600000); // Poll every 5s for up to 10 minutes
const results = await job.getAllResults();
const job = conn.bulk2.createJob({
object: 'Account',
operation: 'insert',
columnDelimiter: 'PIPE', // Use | instead of comma
lineEnding: 'CRLF' // Windows line endings
});
const csv = `Name|Industry|NumberOfEmployees
Company A|Tech|100
Company B|Finance|200`;
await job.open();
await job.uploadData(csv);
await job.close();
await job.poll();
Query with All Records
// Include deleted and archived records
const soql = 'SELECT Id, Name, IsDeleted FROM Account';
const recordStream = await conn.bulk2.query(soql, {
scanAll: true // Use queryAll operation
});
let deletedCount = 0;
recordStream.on('record', record => {
if (record.IsDeleted) deletedCount++;
});
recordStream.on('end', () => {
console.log('Deleted records found:', deletedCount);
});
Retry Failed Records
const results = await conn.bulk2.loadAndWaitForResults({
object: 'Account',
operation: 'insert',
input: records
});
if (results.failedResults.length > 0) {
console.log('Retrying failed records...');
// Extract failed records and fix issues
const retryRecords = results.failedResults.map(record => {
const { sf__Error, sf__Id, ...originalData } = record;
// Fix common issues
if (sf__Error.includes('REQUIRED_FIELD_MISSING')) {
originalData.RequiredField = 'Default Value';
}
return originalData;
});
// Retry
const retryResults = await conn.bulk2.loadAndWaitForResults({
object: 'Account',
operation: 'insert',
input: retryRecords
});
console.log('Retry success:', retryResults.successfulResults.length);
}
Error Handling
try {
const results = await conn.bulk2.loadAndWaitForResults({
object: 'Account',
operation: 'insert',
input: records
});
// Check for failures
if (results.failedResults.length > 0) {
console.log('Some records failed:');
results.failedResults.forEach(record => {
console.log(` Error: ${record.sf__Error}`);
console.log(` Data: ${JSON.stringify(record)}`);
});
}
} catch (err) {
if (err.name === 'JobPollingTimeout') {
console.log('Job timed out but may still be processing');
console.log('Job ID:', err.jobId);
// Reconnect to job later
const job = conn.bulk2.job('ingest', { id: err.jobId });
const info = await job.check();
console.log('Current state:', info.state);
} else {
console.error('Bulk operation failed:', err.message);
}
}
Limits and Considerations
Bulk API v2 Limits:
- Maximum 150 MB data upload per job
- Maximum 150,000,000 characters per job
- Records must not exceed 400KB each
- Maximum 24 hours processing time
- Jobs are automatically deleted after 7 days
- Query results available for 7 days
Comparing v1 and v2
| Feature | Bulk v1 | Bulk v2 |
|---|
| Batch Management | Manual (multiple batches) | Automatic (single upload) |
| Data Upload | Multiple batch uploads | Single upload |
| Job Closure | Manual close required | Manual close required |
| Result Retrieval | Single result call | Separate success/failure/unprocessed endpoints |
| Error Details | Limited | Detailed with original data |
| Performance | Good | Better (optimized engine) |
| Max File Size | 10 MB per batch | 150 MB per job |
| API Complexity | More complex | Simpler workflow |
When to Use v1
- Need batch-level control
- Processing extremely large datasets that exceed v2 limits
- Using serial concurrency mode for record locking
- Working with legacy systems
When to Use v2
- Most modern use cases (recommended)
- Need better error handling
- Want simpler integration
- Processing up to 150 MB per job
- Need detailed success/failure reporting
See Also