Introduction
Streaming allows you to display partial results as they’re generated, providing:- Better user experience - Users see progress immediately
- Lower perceived latency - Responses feel faster
- Real-time feedback - Monitor agent reasoning and tool calls
- Cancellation support - Stop generation early if needed
Quick Start
Stream responses from a chat model:import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "gpt-4o",
streaming: true
});
const stream = await model.stream(
"Write a short poem about the ocean"
);
for await (const chunk of stream) {
process.stdout.write(chunk.content);
}
Streaming from Chat Models
Basic Streaming
Stream text as it’s generated:import { ChatOpenAI } from "@langchain/openai";
import { HumanMessage } from "@langchain/core/messages";
const model = new ChatOpenAI({
model: "gpt-4o",
streaming: true,
temperature: 0.7
});
const stream = await model.stream([
new HumanMessage("Tell me a story about a robot")
]);
let fullResponse = "";
for await (const chunk of stream) {
const content = chunk.content;
process.stdout.write(content);
fullResponse += content;
}
console.log("\n\nComplete response:", fullResponse);
Streaming with Callbacks
Handle stream events with callbacks:import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({ model: "gpt-4o" });
const response = await model.invoke(
"Write a haiku about programming",
{
callbacks: [
{
handleLLMNewToken(token: string) {
process.stdout.write(token);
},
handleLLMEnd(output) {
console.log("\n\nStreaming complete!");
console.log("Token usage:", output.llmOutput?.tokenUsage);
}
}
]
}
);
Streaming with Structured Output
Stream structured responses:import { ChatOpenAI } from "@langchain/openai";
import { z } from "zod";
const model = new ChatOpenAI({ model: "gpt-4o" });
const StorySchema = z.object({
title: z.string(),
characters: z.array(z.string()),
plot: z.string(),
moral: z.string()
});
const structuredModel = model.withStructuredOutput(StorySchema);
// Note: Structured output is returned all at once
const story = await structuredModel.invoke(
"Create a short fable"
);
console.log(story);
Streaming from Chains
Simple Chain Streaming
import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableSequence } from "@langchain/core/runnables";
import { StringOutputParser } from "@langchain/core/output_parsers";
const prompt = PromptTemplate.fromTemplate(
"Write a {length} paragraph about {topic}"
);
const model = new ChatOpenAI({ model: "gpt-4o" });
const chain = RunnableSequence.from([
prompt,
model,
new StringOutputParser()
]);
const stream = await chain.stream({
length: "short",
topic: "quantum computing"
});
for await (const chunk of stream) {
process.stdout.write(chunk);
}
Multi-Step Chain Streaming
import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableSequence } from "@langchain/core/runnables";
import { StringOutputParser } from "@langchain/core/output_parsers";
const topicChain = RunnableSequence.from([
PromptTemplate.fromTemplate("Generate 3 topics about {subject}"),
new ChatOpenAI({ model: "gpt-4o-mini" }),
new StringOutputParser()
]);
const essayChain = RunnableSequence.from([
PromptTemplate.fromTemplate("Write an essay about: {topics}"),
new ChatOpenAI({ model: "gpt-4o" }),
new StringOutputParser()
]);
const fullChain = RunnableSequence.from([
{ topics: topicChain },
essayChain
]);
const stream = await fullChain.stream({
subject: "artificial intelligence"
});
for await (const chunk of stream) {
process.stdout.write(chunk);
}
Streaming from Agents
Basic Agent Streaming
import { createAgent, tool, HumanMessage } from "langchain";
import { ChatOpenAI } from "@langchain/openai";
import { z } from "zod";
const weatherTool = tool(
({ location }) => `Weather in ${location}: 72°F, sunny`,
{
name: "get_weather",
description: "Get weather for a location",
schema: z.object({
location: z.string()
})
}
);
const agent = createAgent({
model: new ChatOpenAI({ model: "gpt-4o" }),
tools: [weatherTool]
});
const stream = await agent.stream(
{ messages: [new HumanMessage("What's the weather in Tokyo?")] },
{ streamMode: "values" }
);
for await (const chunk of stream) {
const lastMessage = chunk.messages[chunk.messages.length - 1];
if (lastMessage._getType() === "human") {
console.log("Human:", lastMessage.content);
} else if (lastMessage._getType() === "ai") {
if (lastMessage.content) {
console.log("AI:", lastMessage.content);
}
if ("tool_calls" in lastMessage && lastMessage.tool_calls?.length > 0) {
console.log("Tool calls:", lastMessage.tool_calls);
}
} else if (lastMessage._getType() === "tool") {
console.log("Tool result:", lastMessage.content);
}
}
Stream Modes
Different ways to stream agent execution:// Stream complete state after each step
const valuesStream = await agent.stream(
{ messages: input },
{ streamMode: "values" }
);
for await (const state of valuesStream) {
console.log("Full state:", state.messages.length, "messages");
}
// Stream only updates (deltas)
const updatesStream = await agent.stream(
{ messages: input },
{ streamMode: "updates" }
);
for await (const update of updatesStream) {
const [nodeId, nodeUpdate] = Object.entries(update)[0];
console.log(`Update from ${nodeId}:`, nodeUpdate);
}
// Stream debug information
const debugStream = await agent.stream(
{ messages: input },
{ streamMode: "debug" }
);
for await (const debugInfo of debugStream) {
console.log("Debug:", debugInfo);
}
Streaming Agent Tokens
Stream individual tokens from agent responses:import { createAgent } from "langchain";
import { ChatOpenAI } from "@langchain/openai";
const agent = createAgent({
model: new ChatOpenAI({
model: "gpt-4o",
streaming: true
}),
tools: []
});
const stream = await agent.stream(
{ messages: "Write a poem" },
{ streamMode: "messages" }
);
for await (const [message, metadata] of stream) {
if (message.content) {
process.stdout.write(message.content);
}
}
Stream Events API
Get fine-grained control over streaming:import { ChatOpenAI } from "@langchain/openai";
import { RunnableSequence } from "@langchain/core/runnables";
const model = new ChatOpenAI({ model: "gpt-4o" });
const eventStream = await model.streamEvents(
"Tell me about TypeScript",
{ version: "v2" }
);
for await (const event of eventStream) {
if (event.event === "on_chat_model_start") {
console.log("\nModel started");
} else if (event.event === "on_chat_model_stream") {
const content = event.data?.chunk?.content;
if (content) {
process.stdout.write(content);
}
} else if (event.event === "on_chat_model_end") {
console.log("\nModel finished");
console.log("Usage:", event.data?.output?.usage_metadata);
}
}
Filtering Stream Events
Filter events by type or tag:import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableSequence } from "@langchain/core/runnables";
const chain = RunnableSequence.from([
PromptTemplate.fromTemplate("Explain {topic}"),
new ChatOpenAI({ model: "gpt-4o" }).withConfig({
tags: ["my-model"]
})
]);
const eventStream = await chain.streamEvents(
{ topic: "recursion" },
{
version: "v2",
includeNames: ["ChatOpenAI"], // Only these components
includeTags: ["my-model"] // Only these tags
}
);
for await (const event of eventStream) {
console.log(event.event, event.name);
}
Streaming HTTP Responses
Stream to HTTP clients:Express.js
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
const app = express();
const model = new ChatOpenAI({ model: "gpt-4o" });
app.get("/chat", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const stream = await model.stream(
req.query.message as string
);
for await (const chunk of stream) {
res.write(`data: ${JSON.stringify({ content: chunk.content })}\n\n`);
}
res.write("data: [DONE]\n\n");
res.end();
});
app.listen(3000);
Next.js API Route
// app/api/chat/route.ts
import { ChatOpenAI } from "@langchain/openai";
import { StreamingTextResponse } from "ai";
export async function POST(req: Request) {
const { message } = await req.json();
const model = new ChatOpenAI({ model: "gpt-4o" });
const stream = await model.stream(message);
// Convert to ReadableStream
const readableStream = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(chunk.content);
}
controller.close();
}
});
return new StreamingTextResponse(readableStream);
}
Vercel AI SDK Integration
import { ChatOpenAI } from "@langchain/openai";
import { LangChainAdapter } from "ai";
const model = new ChatOpenAI({ model: "gpt-4o" });
const stream = await model.stream("Tell me a joke");
// Convert to Vercel AI SDK stream
return LangChainAdapter.toDataStreamResponse(stream);
Cancellation
Cancel streaming operations:import { ChatOpenAI } from "@langchain/openai";
const controller = new AbortController();
const model = new ChatOpenAI({ model: "gpt-4o" });
const stream = await model.stream(
"Write a long essay",
{ signal: controller.signal }
);
// Cancel after 2 seconds
setTimeout(() => {
controller.abort();
console.log("\nStream cancelled!");
}, 2000);
try {
for await (const chunk of stream) {
process.stdout.write(chunk.content);
}
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream was aborted");
}
}
Buffering and Batching
Buffer Tokens
Collect tokens before displaying:import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({ model: "gpt-4o" });
const stream = await model.stream("Write a story");
let buffer = "";
const bufferSize = 10; // Characters
for await (const chunk of stream) {
buffer += chunk.content;
if (buffer.length >= bufferSize) {
process.stdout.write(buffer);
buffer = "";
}
}
// Flush remaining buffer
if (buffer) {
process.stdout.write(buffer);
}
Batch Updates
Send updates at intervals:import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({ model: "gpt-4o" });
const stream = await model.stream("Write a story");
let accumulated = "";
let lastUpdate = Date.now();
const updateInterval = 100; // ms
for await (const chunk of stream) {
accumulated += chunk.content;
const now = Date.now();
if (now - lastUpdate >= updateInterval) {
console.log("Update:", accumulated);
lastUpdate = now;
}
}
console.log("Final:", accumulated);
Streaming Best Practices
Handle Backpressure
Handle Backpressure
Manage slow consumers:
async function* streamWithBackpressure(
source: AsyncIterable<any>,
maxBuffer: number = 10
) {
const buffer: any[] = [];
for await (const chunk of source) {
buffer.push(chunk);
// Yield when buffer is full
if (buffer.length >= maxBuffer) {
yield* buffer;
buffer.length = 0;
}
}
// Yield remaining
if (buffer.length > 0) {
yield* buffer;
}
}
Implement Timeouts
Implement Timeouts
Prevent hanging streams:
async function streamWithTimeout(
stream: AsyncIterable<any>,
timeoutMs: number
) {
const timeout = new Promise((_, reject) => {
setTimeout(() => reject(new Error("Stream timeout")), timeoutMs);
});
const chunks: any[] = [];
try {
await Promise.race([
(async () => {
for await (const chunk of stream) {
chunks.push(chunk);
}
})(),
timeout
]);
} catch (error) {
console.error("Stream timed out");
}
return chunks;
}
Add Error Recovery
Add Error Recovery
Handle streaming errors gracefully:
async function robustStream(
model: ChatOpenAI,
input: string,
maxRetries: number = 3
) {
let attempt = 0;
while (attempt < maxRetries) {
try {
const stream = await model.stream(input);
for await (const chunk of stream) {
process.stdout.write(chunk.content);
}
return; // Success
} catch (error) {
attempt++;
console.error(`Stream error (attempt ${attempt}):`, error);
if (attempt >= maxRetries) {
throw error;
}
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, attempt) * 1000)
);
}
}
}
Monitor Performance
Monitor Performance
Track streaming metrics:
class StreamMonitor {
private startTime: number;
private chunkCount = 0;
private totalChars = 0;
start() {
this.startTime = Date.now();
}
trackChunk(content: string) {
this.chunkCount++;
this.totalChars += content.length;
}
getMetrics() {
const duration = Date.now() - this.startTime;
return {
duration,
chunkCount: this.chunkCount,
totalChars: this.totalChars,
charsPerSecond: (this.totalChars / duration) * 1000,
avgChunkSize: this.totalChars / this.chunkCount
};
}
}
const monitor = new StreamMonitor();
monitor.start();
for await (const chunk of stream) {
monitor.trackChunk(chunk.content);
process.stdout.write(chunk.content);
}
console.log(monitor.getMetrics());
Use Compression
Use Compression
Compress streamed data for network efficiency:
import { createGzip } from "zlib";
import { pipeline } from "stream";
app.get("/chat", async (req, res) => {
res.setHeader("Content-Encoding", "gzip");
res.setHeader("Content-Type", "text/event-stream");
const gzip = createGzip();
pipeline(gzip, res, (err) => {
if (err) console.error("Stream error:", err);
});
const stream = await model.stream(req.query.message as string);
for await (const chunk of stream) {
gzip.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
gzip.end();
});
Next Steps
Building Agents
Stream agent execution and tool calls
Working with Chat Models
Stream responses from different models
Callbacks and Tracing
Monitor streaming with callbacks
Retrieval
Stream RAG responses
