Skip to main content

Introduction

Streaming allows you to display partial results as they’re generated, providing:
  • Better user experience - Users see progress immediately
  • Lower perceived latency - Responses feel faster
  • Real-time feedback - Monitor agent reasoning and tool calls
  • Cancellation support - Stop generation early if needed
LangChain.js provides comprehensive streaming support across models, chains, and agents.

Quick Start

Stream responses from a chat model:
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "gpt-4o",
  streaming: true
});

const stream = await model.stream(
  "Write a short poem about the ocean"
);

for await (const chunk of stream) {
  process.stdout.write(chunk.content);
}

Streaming from Chat Models

Basic Streaming

Stream text as it’s generated:
import { ChatOpenAI } from "@langchain/openai";
import { HumanMessage } from "@langchain/core/messages";

const model = new ChatOpenAI({
  model: "gpt-4o",
  streaming: true,
  temperature: 0.7
});

const stream = await model.stream([
  new HumanMessage("Tell me a story about a robot")
]);

let fullResponse = "";
for await (const chunk of stream) {
  const content = chunk.content;
  process.stdout.write(content);
  fullResponse += content;
}

console.log("\n\nComplete response:", fullResponse);

Streaming with Callbacks

Handle stream events with callbacks:
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({ model: "gpt-4o" });

const response = await model.invoke(
  "Write a haiku about programming",
  {
    callbacks: [
      {
        handleLLMNewToken(token: string) {
          process.stdout.write(token);
        },
        handleLLMEnd(output) {
          console.log("\n\nStreaming complete!");
          console.log("Token usage:", output.llmOutput?.tokenUsage);
        }
      }
    ]
  }
);

Streaming with Structured Output

Stream structured responses:
import { ChatOpenAI } from "@langchain/openai";
import { z } from "zod";

const model = new ChatOpenAI({ model: "gpt-4o" });

const StorySchema = z.object({
  title: z.string(),
  characters: z.array(z.string()),
  plot: z.string(),
  moral: z.string()
});

const structuredModel = model.withStructuredOutput(StorySchema);

// Note: Structured output is returned all at once
const story = await structuredModel.invoke(
  "Create a short fable"
);

console.log(story);

Streaming from Chains

Simple Chain Streaming

import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableSequence } from "@langchain/core/runnables";
import { StringOutputParser } from "@langchain/core/output_parsers";

const prompt = PromptTemplate.fromTemplate(
  "Write a {length} paragraph about {topic}"
);

const model = new ChatOpenAI({ model: "gpt-4o" });

const chain = RunnableSequence.from([
  prompt,
  model,
  new StringOutputParser()
]);

const stream = await chain.stream({
  length: "short",
  topic: "quantum computing"
});

for await (const chunk of stream) {
  process.stdout.write(chunk);
}

Multi-Step Chain Streaming

import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableSequence } from "@langchain/core/runnables";
import { StringOutputParser } from "@langchain/core/output_parsers";

const topicChain = RunnableSequence.from([
  PromptTemplate.fromTemplate("Generate 3 topics about {subject}"),
  new ChatOpenAI({ model: "gpt-4o-mini" }),
  new StringOutputParser()
]);

const essayChain = RunnableSequence.from([
  PromptTemplate.fromTemplate("Write an essay about: {topics}"),
  new ChatOpenAI({ model: "gpt-4o" }),
  new StringOutputParser()
]);

const fullChain = RunnableSequence.from([
  { topics: topicChain },
  essayChain
]);

const stream = await fullChain.stream({
  subject: "artificial intelligence"
});

for await (const chunk of stream) {
  process.stdout.write(chunk);
}

Streaming from Agents

Basic Agent Streaming

import { createAgent, tool, HumanMessage } from "langchain";
import { ChatOpenAI } from "@langchain/openai";
import { z } from "zod";

const weatherTool = tool(
  ({ location }) => `Weather in ${location}: 72°F, sunny`,
  {
    name: "get_weather",
    description: "Get weather for a location",
    schema: z.object({
      location: z.string()
    })
  }
);

const agent = createAgent({
  model: new ChatOpenAI({ model: "gpt-4o" }),
  tools: [weatherTool]
});

const stream = await agent.stream(
  { messages: [new HumanMessage("What's the weather in Tokyo?")] },
  { streamMode: "values" }
);

for await (const chunk of stream) {
  const lastMessage = chunk.messages[chunk.messages.length - 1];
  
  if (lastMessage._getType() === "human") {
    console.log("Human:", lastMessage.content);
  } else if (lastMessage._getType() === "ai") {
    if (lastMessage.content) {
      console.log("AI:", lastMessage.content);
    }
    if ("tool_calls" in lastMessage && lastMessage.tool_calls?.length > 0) {
      console.log("Tool calls:", lastMessage.tool_calls);
    }
  } else if (lastMessage._getType() === "tool") {
    console.log("Tool result:", lastMessage.content);
  }
}

Stream Modes

Different ways to stream agent execution:
// Stream complete state after each step
const valuesStream = await agent.stream(
  { messages: input },
  { streamMode: "values" }
);

for await (const state of valuesStream) {
  console.log("Full state:", state.messages.length, "messages");
}

// Stream only updates (deltas)
const updatesStream = await agent.stream(
  { messages: input },
  { streamMode: "updates" }
);

for await (const update of updatesStream) {
  const [nodeId, nodeUpdate] = Object.entries(update)[0];
  console.log(`Update from ${nodeId}:`, nodeUpdate);
}

// Stream debug information
const debugStream = await agent.stream(
  { messages: input },
  { streamMode: "debug" }
);

for await (const debugInfo of debugStream) {
  console.log("Debug:", debugInfo);
}

Streaming Agent Tokens

Stream individual tokens from agent responses:
import { createAgent } from "langchain";
import { ChatOpenAI } from "@langchain/openai";

const agent = createAgent({
  model: new ChatOpenAI({ 
    model: "gpt-4o",
    streaming: true 
  }),
  tools: []
});

const stream = await agent.stream(
  { messages: "Write a poem" },
  { streamMode: "messages" }
);

for await (const [message, metadata] of stream) {
  if (message.content) {
    process.stdout.write(message.content);
  }
}

Stream Events API

Get fine-grained control over streaming:
import { ChatOpenAI } from "@langchain/openai";
import { RunnableSequence } from "@langchain/core/runnables";

const model = new ChatOpenAI({ model: "gpt-4o" });

const eventStream = await model.streamEvents(
  "Tell me about TypeScript",
  { version: "v2" }
);

for await (const event of eventStream) {
  if (event.event === "on_chat_model_start") {
    console.log("\nModel started");
  } else if (event.event === "on_chat_model_stream") {
    const content = event.data?.chunk?.content;
    if (content) {
      process.stdout.write(content);
    }
  } else if (event.event === "on_chat_model_end") {
    console.log("\nModel finished");
    console.log("Usage:", event.data?.output?.usage_metadata);
  }
}

Filtering Stream Events

Filter events by type or tag:
import { ChatOpenAI } from "@langchain/openai";
import { PromptTemplate } from "@langchain/core/prompts";
import { RunnableSequence } from "@langchain/core/runnables";

const chain = RunnableSequence.from([
  PromptTemplate.fromTemplate("Explain {topic}"),
  new ChatOpenAI({ model: "gpt-4o" }).withConfig({
    tags: ["my-model"]
  })
]);

const eventStream = await chain.streamEvents(
  { topic: "recursion" },
  {
    version: "v2",
    includeNames: ["ChatOpenAI"],  // Only these components
    includeTags: ["my-model"]       // Only these tags
  }
);

for await (const event of eventStream) {
  console.log(event.event, event.name);
}

Streaming HTTP Responses

Stream to HTTP clients:

Express.js

import express from "express";
import { ChatOpenAI } from "@langchain/openai";

const app = express();
const model = new ChatOpenAI({ model: "gpt-4o" });

app.get("/chat", async (req, res) => {
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  
  const stream = await model.stream(
    req.query.message as string
  );
  
  for await (const chunk of stream) {
    res.write(`data: ${JSON.stringify({ content: chunk.content })}\n\n`);
  }
  
  res.write("data: [DONE]\n\n");
  res.end();
});

app.listen(3000);

Next.js API Route

// app/api/chat/route.ts
import { ChatOpenAI } from "@langchain/openai";
import { StreamingTextResponse } from "ai";

export async function POST(req: Request) {
  const { message } = await req.json();
  
  const model = new ChatOpenAI({ model: "gpt-4o" });
  const stream = await model.stream(message);
  
  // Convert to ReadableStream
  const readableStream = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        controller.enqueue(chunk.content);
      }
      controller.close();
    }
  });
  
  return new StreamingTextResponse(readableStream);
}

Vercel AI SDK Integration

import { ChatOpenAI } from "@langchain/openai";
import { LangChainAdapter } from "ai";

const model = new ChatOpenAI({ model: "gpt-4o" });

const stream = await model.stream("Tell me a joke");

// Convert to Vercel AI SDK stream
return LangChainAdapter.toDataStreamResponse(stream);

Cancellation

Cancel streaming operations:
import { ChatOpenAI } from "@langchain/openai";

const controller = new AbortController();

const model = new ChatOpenAI({ model: "gpt-4o" });

const stream = await model.stream(
  "Write a long essay",
  { signal: controller.signal }
);

// Cancel after 2 seconds
setTimeout(() => {
  controller.abort();
  console.log("\nStream cancelled!");
}, 2000);

try {
  for await (const chunk of stream) {
    process.stdout.write(chunk.content);
  }
} catch (error) {
  if (error.name === "AbortError") {
    console.log("Stream was aborted");
  }
}

Buffering and Batching

Buffer Tokens

Collect tokens before displaying:
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({ model: "gpt-4o" });
const stream = await model.stream("Write a story");

let buffer = "";
const bufferSize = 10; // Characters

for await (const chunk of stream) {
  buffer += chunk.content;
  
  if (buffer.length >= bufferSize) {
    process.stdout.write(buffer);
    buffer = "";
  }
}

// Flush remaining buffer
if (buffer) {
  process.stdout.write(buffer);
}

Batch Updates

Send updates at intervals:
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({ model: "gpt-4o" });
const stream = await model.stream("Write a story");

let accumulated = "";
let lastUpdate = Date.now();
const updateInterval = 100; // ms

for await (const chunk of stream) {
  accumulated += chunk.content;
  
  const now = Date.now();
  if (now - lastUpdate >= updateInterval) {
    console.log("Update:", accumulated);
    lastUpdate = now;
  }
}

console.log("Final:", accumulated);

Streaming Best Practices

Manage slow consumers:
async function* streamWithBackpressure(
  source: AsyncIterable<any>,
  maxBuffer: number = 10
) {
  const buffer: any[] = [];
  
  for await (const chunk of source) {
    buffer.push(chunk);
    
    // Yield when buffer is full
    if (buffer.length >= maxBuffer) {
      yield* buffer;
      buffer.length = 0;
    }
  }
  
  // Yield remaining
  if (buffer.length > 0) {
    yield* buffer;
  }
}
Prevent hanging streams:
async function streamWithTimeout(
  stream: AsyncIterable<any>,
  timeoutMs: number
) {
  const timeout = new Promise((_, reject) => {
    setTimeout(() => reject(new Error("Stream timeout")), timeoutMs);
  });
  
  const chunks: any[] = [];
  
  try {
    await Promise.race([
      (async () => {
        for await (const chunk of stream) {
          chunks.push(chunk);
        }
      })(),
      timeout
    ]);
  } catch (error) {
    console.error("Stream timed out");
  }
  
  return chunks;
}
Handle streaming errors gracefully:
async function robustStream(
  model: ChatOpenAI,
  input: string,
  maxRetries: number = 3
) {
  let attempt = 0;
  
  while (attempt < maxRetries) {
    try {
      const stream = await model.stream(input);
      
      for await (const chunk of stream) {
        process.stdout.write(chunk.content);
      }
      
      return; // Success
    } catch (error) {
      attempt++;
      console.error(`Stream error (attempt ${attempt}):`, error);
      
      if (attempt >= maxRetries) {
        throw error;
      }
      
      // Exponential backoff
      await new Promise(resolve => 
        setTimeout(resolve, Math.pow(2, attempt) * 1000)
      );
    }
  }
}
Track streaming metrics:
class StreamMonitor {
  private startTime: number;
  private chunkCount = 0;
  private totalChars = 0;
  
  start() {
    this.startTime = Date.now();
  }
  
  trackChunk(content: string) {
    this.chunkCount++;
    this.totalChars += content.length;
  }
  
  getMetrics() {
    const duration = Date.now() - this.startTime;
    return {
      duration,
      chunkCount: this.chunkCount,
      totalChars: this.totalChars,
      charsPerSecond: (this.totalChars / duration) * 1000,
      avgChunkSize: this.totalChars / this.chunkCount
    };
  }
}

const monitor = new StreamMonitor();
monitor.start();

for await (const chunk of stream) {
  monitor.trackChunk(chunk.content);
  process.stdout.write(chunk.content);
}

console.log(monitor.getMetrics());
Compress streamed data for network efficiency:
import { createGzip } from "zlib";
import { pipeline } from "stream";

app.get("/chat", async (req, res) => {
  res.setHeader("Content-Encoding", "gzip");
  res.setHeader("Content-Type", "text/event-stream");
  
  const gzip = createGzip();
  pipeline(gzip, res, (err) => {
    if (err) console.error("Stream error:", err);
  });
  
  const stream = await model.stream(req.query.message as string);
  
  for await (const chunk of stream) {
    gzip.write(`data: ${JSON.stringify(chunk)}\n\n`);
  }
  
  gzip.end();
});

Next Steps

Building Agents

Stream agent execution and tool calls

Working with Chat Models

Stream responses from different models

Callbacks and Tracing

Monitor streaming with callbacks

Retrieval

Stream RAG responses

Build docs developers (and LLMs) love