I've done it. Everyone does it. You're building a new feature, you wire up the standard /chat/completions or generateContent endpoint, it works, you ship. Somewhere in the back of your mind, a voice whispers: "I should probably use the batch API for this, it's half the price." You add a TODO comment. You move on. Six months later, the TODO is still there, and you've spent twice what you needed to.
The batch pattern is not that hard. It's the same across every provider I've worked with. Once you've built it once, you can transplant it anywhere.
The Universal Batch Recipe
Every batch API I've used works the same way:
1. Submit: build a JSONL file (one JSON object per line, each containing a request), upload it somewhere the provider can read it, and call a "create batch job" endpoint. You get back a job ID.
2. Wait: the job runs asynchronously. Minutes or hours. No callback in most cases, you need to poll the provider to know when it's done.
3. Collect: download the results (another JSONL file, one response per line matched by key/ID), process them.
I usually insert this workflow in a Step Functions workflow. Step 2 is where the architecture decisions happen. Two clean options in a serverless setup:
- SQS polling: a Lambda re-enqueues itself every N minutes, checks the job status, and notifies Step Functions when done.
- EventBridge callback: the provider (or AWS itself) emits an event when the job completes, which triggers a Lambda that notifies Step Functions.
Pattern 1: Direct API Batch With SQS Polling
This pattern works with any provider whose batch API you call directly (Vertex AI, Mistral, OpenAI, etc.). Your Lambda polls the provider's API at regular intervals until the job finishes.
This example submits a batch of multimodal image-generation requests to Vertex AI (Gemini). Each request contains a text prompt plus reference images. In this example, images stored on S3 are first uploaded to GCS before submitting the batch.
The Submit Step
First, upload the images to GCS:
/**
* Upload face images from S3 to GCS before batch submission.
* Vertex AI can only read files from GCS, so this cross-cloud
* transfer is required when your primary storage is S3.
*/
// Minimal types for this example
interface PackEntry {
packIndex: number;
images: Array<{ cropKey: string }>;
}
// Bucket is from @google-cloud/storage
async function uploadFacesToGCS(
packs: PackEntry[],
gcsBucket: Bucket,
s3: S3Client,
s3BucketName: string,
jobId: string,
): Promise<void> {
const seen = new Set<string>();
for (const pack of packs) {
for (const image of pack.images) {
if (seen.has(image.cropKey)) continue;
seen.add(image.cropKey);
// Download from S3
const response = await s3.send(
new GetObjectCommand({ Bucket: s3BucketName, Key: image.cropKey }),
);
const body = await response.Body?.transformToByteArray();
if (!body) throw new Error(`Empty body for S3 key: ${image.cropKey}`);
// Flatten the S3 key: "private/abc/faces/crop1.jpg" → "private_abc_faces_crop1.jpg"
const gcsKey = image.cropKey.replace(/\//g, "_");
await gcsBucket.file(`${jobId}/faces/${gcsKey}`).save(
Buffer.from(body),
{ metadata: { contentType: "image/jpeg" } },
);
}
}
}
This is the submit Lambda for a Vertex AI (Gemini) image generation batch. It builds a JSONL from a set of prompts, uploads it to GCS, and submits the job:
// Build a JSONL line for a single batch prediction request
function buildJsonlLine(
prompt: PromptEntry,
pack: PackEntry,
gcsBucketName: string,
jobId: string,
): string {
const parts: Array<Record<string, unknown>> = [{ text: prompt.prompt }];
// Attach each reference face image as a GCS file reference.
// The gcsKey must match the flattened key used during upload.
for (const image of pack.images) {
const gcsKey = image.cropKey.replace(/\//g, "_");
parts.push({
fileData: {
fileUri: `gs://${gcsBucketName}/${jobId}/faces/${gcsKey}`,
mimeType: "image/jpeg",
},
});
}
const request = {
contents: [{ role: "user", parts }],
generation_config: {
responseModalities: ["IMAGE"],
imageConfig: { aspectRatio: "3:4", imageSize: "1K" },
},
};
return JSON.stringify({ key: prompt.requestId, request });
}
The key detail: each JSONL line has a key field (a UUID). This is your correlation ID. When you collect results later, the output JSONL will include this same key, so you know which response maps to which request.
The submission itself uses the Google GenAI SDK:
// 1. Upload face images from S3 to GCS
await uploadFacesToGCS(packs, gcsBucket, s3, s3BucketName, jobId);
// 2. Upload JSONL to GCS
const jsonlContent = jsonlLines.join("\n");
const inputGcsPath = `batch/${jobId}/input.jsonl`;
await gcsBucket.file(inputGcsPath).save(
Buffer.from(jsonlContent, "utf-8"),
{ metadata: { contentType: "application/jsonl" } },
);
// 3. Submit batch job via SDK
const batchJob = await ai.batches.create({
model: `publishers/google/models/gemini-3-pro-image-preview`,
src: `gs://${gcsBucketName}/${inputGcsPath}`,
config: {
displayName: jobDisplayName,
dest: `gs://${gcsBucketName}/batch/${jobId}/output/`,
},
});
An important detail: idempotence. Before submitting, check if a job already exists for this input. Batch jobs can take hours; if your Lambda retries (timeout, transient error), you don't want to double-submit:
// Idempotence check: skip if job already exists
const existingJobs = await ai.batches.list({
config: { filter: `display_name="${jobDisplayName}"` },
});
for await (const job of existingJobs) {
const state = job.state ?? "";
if (["JOB_STATE_RUNNING", "JOB_STATE_PENDING", "JOB_STATE_SUCCEEDED"]
.includes(state)) {
return { success: true, data: { batchJobName: job.name, reused: true } };
}
}
The Poll Step (SQS self-re-enqueue)
This is the part that looks weird the first time you see it: a Lambda that sends a message to its own trigger queue.
Step Functions sends a message (with a task token) to an SQS queue. The poll Lambda picks it up, checks the batch job status, and either:
- Calls
SendTaskSuccess/SendTaskFailureif the job is done - Re-enqueues the message with a delay if it's still running
export async function lambda_handler(event: SQSEvent): Promise<void> {
const record = event.Records[0];
const message = JSON.parse(record.body) as PollMessage;
const ai = await createGoogleAI();
const batchJob = await ai.batches.get({ name: message.batchJobName });
const jobState = batchJob.state ?? "UNKNOWN";
switch (jobState) {
case "JOB_STATE_SUCCEEDED":
case "JOB_STATE_PARTIALLY_SUCCEEDED":
await sendTaskResult(message.taskToken, true, {
success: true,
data: {
batchJobName: message.batchJobName,
outputGcsDirectory: `gs://${message.gcsBucketName}/batch/${message.jobId}/output/`,
},
});
break;
case "JOB_STATE_FAILED":
case "JOB_STATE_CANCELLED":
await sendTaskResult(message.taskToken, false, {
state: jobState,
});
break;
case "JOB_STATE_RUNNING":
case "JOB_STATE_PENDING":
// Re-enqueue for another poll in 5 minutes
await reEnqueue(queueUrl, message, 300);
break;
}
}
A few guardrails you need:
- Max poll attempts: a counter in the message body, incremented on each re-enqueue. After N attempts (say 320, which gives ~26h at 5min intervals), fail the workflow.
- Stale task tokens: Step Functions tokens expire. If
SendTaskSuccessthrowsTaskTimedOutorInvalidToken, just discard the message. The workflow has already timed out on its side. - DLQ on the SQS queue: if something goes fundamentally wrong, messages land in the dead letter queue instead of looping forever.
- Recursive loop protection: AWS detects Lambda → SQS → Lambda loops and kills them. Our polling pattern looks exactly like one. Set
recursiveLoop: lambda.RecursiveLoop.ALLOWon the Lambda construct, or AWS terminates the chain after a few invocations (you'll get a notification, but your workflow is already dead).
The CDK wiring for the poll Lambda needs three things: the recursive loop opt-in, the SQS event source, and send-message permission back to its own queue:
const pollingLambda = new lambda.Function(this, "BatchPoll", {
// ...handler, runtime, etc.
environment: {
POLL_QUEUE_URL: pollQueue.queueUrl,
},
recursiveLoop: lambda.RecursiveLoop.ALLOW, // <-- without this, AWS kills the loop
});
// SQS trigger: one message at a time
pollingLambda.addEventSource(
new SqsEventSource(pollQueue, { batchSize: 1 }),
);
// Lambda needs permission to re-enqueue messages to its own queue
pollQueue.grantSendMessages(pollingLambda);
And the Step Functions step that sends the initial message and freezes until the poll Lambda calls SendTaskSuccess:
const pollStep = new tasks.SqsSendMessage(this, "WaitForBatchCompletion", {
queue: pollQueue,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
messageBody: sfn.TaskInput.fromObject({
taskToken: sfn.JsonPath.taskToken,
"batchJobName.$": "$.stepResults.BatchSubmit.output.data.batchJobName",
"jobId.$": "$.jobId",
}),
resultPath: "$.stepResults.WaitForBatchCompletion",
taskTimeout: sfn.Timeout.duration(Duration.hours(26)),
});
The Collect Step
The collect step is the least interesting. Download the output JSONL from GCS, parse each line, match it back to your original request by key, and handle the result. For this example, that's extracting base64 images and writing them to S3. The output has the same shape as the input: one JSON object per line, with the same key you submitted.
I've used this exact same submit/poll/collect pattern with Mistral's batch API, and it maps 1-to-1: different SDK, same JSONL-in/JSONL-out structure, same poll Lambda logic. Once you've built it for one provider, adapting it to another is maybe an hour of work.
Pattern 2: AWS Bedrock Batch With EventBridge
When you're using AWS Bedrock's own batch inference, you get a bonus: Bedrock emits EventBridge events when jobs complete. No polling needed.
The Submit Step
Bedrock batch expects a JSONL file in S3 with a specific format:
def build_jsonl_records(articles):
records = []
for article in articles:
record = {
"recordId": article['id'],
"modelInput": {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"temperature": 0.7,
"system": SYSTEM_PROMPT,
"messages": [
{"role": "user", "content": prompt}
]
}
}
records.append(record)
return records
The submission uses create_model_invocation_job:
response = bedrock.create_model_invocation_job(
jobName=job_name,
modelId=model_id,
roleArn=role_arn, # Bedrock needs an IAM role to read/write S3
inputDataConfig={
"s3InputDataConfig": {
"s3Uri": s3_input_uri,
"s3InputFormat": "JSONL"
}
},
outputDataConfig={
"s3OutputDataConfig": {"s3Uri": s3_output_uri}
},
)
The 100-Entry Minimum (and How to Deal With It)
One gotcha: AWS Bedrock batch requires a minimum of 100 records in your JSONL file. If you have 12 articles to process, you need 88 more lines.
Pad with dummy requests that cost almost nothing:
def pad_jsonl_to_minimum(records, minimum=100):
if len(records) >= minimum:
return records
padding_needed = minimum - len(records)
for i in range(padding_needed):
records.append({
"recordId": f"PADDING_{i:03d}",
"modelInput": {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 5, # Minimal output = minimal cost
"messages": [
{"role": "user", "content": "Answer with yes"}
]
}
})
return records
max_tokens: 5 with a trivial prompt. Each padding record costs a fraction of a cent. Your collect step filters out records whose recordId starts with PADDING_.
EventBridge Instead of Polling (But First: What's a Task Token?)
In Pattern 1, Step Functions pauses at the SQS step using waitForTaskToken. A task token is a unique string that Step Functions generates and passes to whatever system you're integrating with. The workflow freezes until someone calls SendTaskSuccess(taskToken) or SendTaskFailure(taskToken) with that token.
In the SQS polling pattern, the token travels inside the SQS message body. The poll Lambda reads it, and when the batch job is done, it calls SendTaskSuccess with it. The token stays in the message loop.
With EventBridge, we have a different problem. Bedrock emits a completion event, but that event doesn't know anything about our Step Functions workflow. It just says "job X finished." So how does the callback Lambda get the task token?
The answer: we store it in S3 as an intermediate step.
The flow:
- CollectRSS Lambda runs: fetches articles, builds the JSONL, submits the Bedrock batch job. Returns the
jobArnand the S3 prefix where the input lives. - StoreTaskToken step (Step Functions
waitForTaskTokenintegration): Step Functions generates a task token and invokes a Lambda, passing both the token and the job metadata. This Lambda writes the token to S3 next to the batch input, then does not return. Step Functions stays frozen, waiting for someone to callSendTaskSuccesswith that token. - Hours pass. Bedrock processes the batch.
- Bedrock finishes → emits an EventBridge event → triggers the BatchCallback Lambda. This Lambda looks up the Bedrock job details to find the S3 input path, reads the task token file from S3, and calls
SendTaskSuccess. Step Functions wakes up and proceeds to the next step.
The "store task token" Lambda just writes JSON to S3:
export async function lambda_handler(event: StoreTaskTokenEvent) {
const { s3Prefix, jobArn, taskToken } = event;
const key = `${s3Prefix}/tasktoken.json`;
await s3.send(new PutObjectCommand({
Bucket: BUCKET,
Key: key,
Body: JSON.stringify({ taskToken, jobArn }),
ContentType: "application/json",
}));
return { success: true, s3Key: key };
}
The CDK wiring for this step uses WAIT_FOR_TASK_TOKEN. Step Functions will not advance past this step until someone externally calls SendTaskSuccess:
const storeTaskTokenStep = new tasks.LambdaInvoke(this, "StoreTaskToken", {
lambdaFunction: storeTaskTokenLambda,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: sfn.TaskInput.fromObject({
"s3Prefix.$": "$.stepResults.CollectRSS.output.s3Prefix",
"jobArn.$": "$.stepResults.CollectRSS.output.jobArn",
taskToken: sfn.JsonPath.taskToken, // <-- SF generates this
}),
heartbeatTimeout: sfn.Timeout.duration(Duration.hours(24)),
taskTimeout: sfn.Timeout.duration(Duration.hours(48)),
});
On the EventBridge side, Bedrock emits a Batch Inference Job State Change event when a job finishes. A rule matches our job name prefix and routes to the callback Lambda:
const batchJobCompletionRule = new events.Rule(this, "BedrockBatchCompletion", {
eventPattern: {
source: ["aws.bedrock"],
detailType: ["Batch Inference Job State Change"],
detail: {
status: ["Completed", "Failed", "Stopped", "Expired"],
batchJobName: [{ prefix: "rss-enrichment-" }],
},
},
});
batchJobCompletionRule.addTarget(
new targets.LambdaFunction(batchCallbackLambda, {
retryAttempts: 2,
deadLetterQueue: callbackDlq,
}),
);
The callback Lambda receives the EventBridge event, finds the task token in S3, and wakes up Step Functions:
export async function lambda_handler(event: BatchJobEvent) {
const { batchJobArn, status } = event.detail;
// Get the job's S3 input path so we can find the stored task token
const jobResponse = await bedrock.send(
new GetModelInvocationJobCommand({ jobIdentifier: batchJobArn }),
);
const s3Prefix = /* derived from jobResponse.inputDataConfig.s3Uri */;
// Read the task token we stored earlier
const tokenKey = `${s3Prefix}/tasktoken.json`;
const tokenObj = await s3.send(
new GetObjectCommand({ Bucket: BUCKET, Key: tokenKey }),
);
const tokenData = JSON.parse(await tokenObj.Body.transformToString());
// Wake up Step Functions
if (status === "Completed") {
await sfn.send(new SendTaskSuccessCommand({
taskToken: tokenData.taskToken,
output: JSON.stringify({ jobArn: batchJobArn, status: "Completed" }),
}));
} else {
await sfn.send(new SendTaskFailureCommand({
taskToken: tokenData.taskToken,
error: "BatchJobFailed",
cause: `Bedrock batch job ${status}: ${batchJobArn}`,
}));
}
}
That's it: no polling loop, no SQS re-enqueue. EventBridge fires once, the callback Lambda runs once, and Step Functions resumes. The only extra cost is one S3 write/read for the token.
When to Use Which
| SQS Polling | EventBridge | |
|---|---|---|
| Works with | Any provider (Vertex AI, Mistral, OpenAI, Anthropic API) | AWS Bedrock only |
| Moving parts | SQS queue + DLQ + poll Lambda | EventBridge rule + callback Lambda + S3 token store |
| Latency | Poll interval (e.g., 5 min) | Near-instant on completion |
| Complexity | Self-re-enqueue logic, max attempt guards | Simpler, but token storage adds a step |
For non-AWS providers, you don't have a choice: use SQS polling. For Bedrock, EventBridge is cleaner.
The Economics
Most providers offer 50% off on batch inference. Gemini, Mistral, Anthropic (through Bedrock or direct), OpenAI. The tradeoff is latency: batch jobs take minutes to hours instead of seconds. If your use case can tolerate that (background processing, nightly enrichment, media generation, transcription pipelines), you're paying double for no reason with the sync API.
On a workload doing 1,000 Haiku calls per day, the savings are obvious by month two. On image generation batches, where individual calls cost real money, it pays for itself on day one.
Ready-to-Deploy Code
Both patterns above are extracted from production systems running on AWS. The full infrastructure code (CDK stacks, Step Functions workflows, Lambda handlers, polling mechanics, EventBridge wiring) lives in Bootwright, a SaaS boilerplate I maintain. It ships the SQS polling pattern for Vertex AI and the EventBridge pattern for Bedrock. Everything is wired into Step Functions with error handling, retries, and DLQ monitoring.
Batch inference is plumbing. You set it up once, copy the pattern to the next provider, and your inference bill drops by half.
