Skip to main content
LangGraph는 실시간 업데이트를 제공하기 위한 스트리밍 시스템을 구현합니다. 스트리밍은 LLM 기반 애플리케이션의 응답성을 향상시키는 데 매우 중요합니다. 완전한 응답이 준비되기 전에도 출력을 점진적으로 표시함으로써, 스트리밍은 특히 LLM의 지연 시간을 처리할 때 사용자 경험(UX)을 크게 개선합니다. LangGraph 스트리밍으로 가능한 것들:

지원되는 스트림 모드

다음 스트림 모드 중 하나 이상을 리스트로 stream() 메서드에 전달합니다:
모드설명
values그래프의 각 단계 후 상태의 전체 값을 스트리밍합니다.
updates그래프의 각 단계 후 상태에 대한 업데이트를 스트리밍합니다. 동일한 단계에서 여러 업데이트가 발생하면 (예: 여러 노드가 실행됨), 해당 업데이트는 개별적으로 스트리밍됩니다.
custom그래프 노드 내부에서 커스텀 데이터를 스트리밍합니다.
messagesLLM이 호출되는 모든 그래프 노드에서 2-튜플 (LLM 토큰, 메타데이터)을 스트리밍합니다.
debug그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다.

기본 사용 예제

LangGraph 그래프는 스트리밍 출력을 이터레이터로 생성하기 위해 .stream() 메서드를 제공합니다.
for await (const chunk of await graph.stream(inputs, {
  streamMode: "updates",
})) {
  console.log(chunk);
}
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();

for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  // 각 노드 후 그래프 상태에 대한 업데이트만 스트리밍하려면 streamMode: "updates"로 설정합니다
  // 다른 스트림 모드도 사용할 수 있습니다. 지원되는 스트림 모드를 참조하세요
  { streamMode: "updates" }
)) {
  console.log(chunk);
}
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}

여러 모드 스트리밍

streamMode 매개변수에 배열을 전달하여 여러 모드를 동시에 스트리밍할 수 있습니다. 스트리밍되는 출력은 [mode, chunk] 튜플이 되며, 여기서 mode는 스트림 모드의 이름이고 chunk는 해당 모드에서 스트리밍된 데이터입니다.
for await (const [mode, chunk] of await graph.stream(inputs, {
  streamMode: ["updates", "custom"],
})) {
  console.log(chunk);
}

그래프 상태 스트리밍

updatesvalues 스트림 모드를 사용하여 그래프 실행 시 상태를 스트리밍합니다.
  • updates는 그래프의 각 단계 후 상태에 대한 업데이트를 스트리밍합니다.
  • values는 그래프의 각 단계 후 상태의 전체 값을 스트리밍합니다.
import { StateGraph, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();
  • updates
  • values
각 단계 후 노드가 반환한 상태 업데이트만 스트리밍하려면 이 모드를 사용합니다. 스트리밍되는 출력에는 노드의 이름과 업데이트가 포함됩니다.
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "updates" }
)) {
  console.log(chunk);
}

서브그래프 출력 스트리밍

스트리밍되는 출력에 서브그래프의 출력을 포함하려면, 부모 그래프의 .stream() 메서드에서 subgraphs: true로 설정할 수 있습니다. 이렇게 하면 부모 그래프와 모든 서브그래프의 출력이 스트리밍됩니다. 출력은 [namespace, data] 튜플로 스트리밍되며, 여기서 namespace는 서브그래프가 호출되는 노드의 경로를 가진 튜플입니다. 예: ["parent_node:<task_id>", "child_node:<task_id>"]
for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    // 서브그래프의 출력을 스트리밍하려면 subgraphs: true로 설정합니다
    subgraphs: true,
    streamMode: "updates",
  }
)) {
  console.log(chunk);
}
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

// 서브그래프 정의
const SubgraphState = z.object({
  foo: z.string(), // 이 키는 부모 그래프 상태와 공유됩니다
  bar: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "bar" };
  })
  .addNode("subgraphNode2", (state) => {
    return { foo: state.foo + state.bar };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();

// 부모 그래프 정의
const ParentState = z.object({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", subgraph)
  .addEdge(START, "node1")
  .addEdge("node1", "node2");
const graph = builder.compile();

for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    streamMode: "updates",
    // 서브그래프의 출력을 스트리밍하려면 subgraphs: true로 설정합니다
    subgraphs: true,
  }
)) {
  console.log(chunk);
}
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
참고 노드 업데이트뿐만 아니라 어떤 그래프(또는 서브그래프)에서 스트리밍하고 있는지 알려주는 네임스페이스도 받고 있습니다.

디버깅

debug 스트리밍 모드를 사용하여 그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다. 스트리밍되는 출력에는 노드의 이름과 전체 상태가 포함됩니다.
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "debug" }
)) {
  console.log(chunk);
}

LLM 토큰

messages 스트리밍 모드를 사용하여 노드, 도구, 서브그래프 또는 태스크를 포함한 그래프의 모든 부분에서 대규모 언어 모델(LLM) 출력을 토큰 단위로 스트리밍합니다. messages 모드에서 스트리밍되는 출력은 [message_chunk, metadata] 튜플입니다:
  • message_chunk: LLM에서 나온 토큰 또는 메시지 세그먼트입니다.
  • metadata: 그래프 노드 및 LLM 호출에 대한 세부 정보가 포함된 딕셔너리입니다.
LLM이 LangChain 통합으로 제공되지 않는 경우, custom 모드를 사용하여 출력을 스트리밍할 수 있습니다. 자세한 내용은 모든 LLM과 함께 사용을 참조하세요.
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

const MyState = z.object({
  topic: z.string(),
  joke: z.string().default(""),
});

const llm = new ChatOpenAI({ model: "gpt-4o-mini" });

const callModel = async (state: z.infer<typeof MyState>) => {
  // 주제에 대한 농담을 생성하기 위해 LLM을 호출합니다
  // LLM이 .stream이 아닌 .invoke를 사용하여 실행되더라도 메시지 이벤트가 발생합니다
  const llmResponse = await llm.invoke([
    { role: "user", content: `Generate a joke about ${state.topic}` },
  ]);
  return { joke: llmResponse.content };
};

const graph = new StateGraph(MyState)
  .addNode("callModel", callModel)
  .addEdge(START, "callModel")
  .compile();

// "messages" 스트림 모드는 [messageChunk, metadata] 튜플의 이터레이터를 반환합니다
// 여기서 messageChunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
// LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for await (const [messageChunk, metadata] of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "messages" }
)) {
  if (messageChunk.content) {
    console.log(messageChunk.content + "|");
  }
}

LLM 호출별 필터링

LLM 호출에 tags를 연결하여 LLM 호출별로 스트리밍된 토큰을 필터링할 수 있습니다.
import { ChatOpenAI } from "@langchain/openai";

// llm1은 "joke" 태그가 지정됩니다
const llm1 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['joke']
});
// llm2는 "poem" 태그가 지정됩니다
const llm2 = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ['poem']
});

const graph = // ... 이러한 LLM을 사용하는 그래프를 정의합니다

// streamMode를 "messages"로 설정하여 LLM 토큰을 스트리밍합니다
// metadata에는 태그를 포함한 LLM 호출에 대한 정보가 포함됩니다
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // metadata의 tags 필드로 스트리밍된 토큰을 필터링하여
  // "joke" 태그가 있는 LLM 호출의 토큰만 포함합니다
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

// jokeModel은 "joke" 태그가 지정됩니다
const jokeModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["joke"]
});
// poemModel은 "poem" 태그가 지정됩니다
const poemModel = new ChatOpenAI({
  model: "gpt-4o-mini",
  tags: ["poem"]
});

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("callModel", (state) => {
    const topic = state.topic;
    console.log("Writing joke...");

    const jokeResponse = await jokeModel.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);

    console.log("\n\nWriting poem...");
    const poemResponse = await poemModel.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);

    return {
      joke: jokeResponse.content,
      poem: poemResponse.content
    };
  })
  .addEdge(START, "callModel")
  .compile();

// streamMode를 "messages"로 설정하여 LLM 토큰을 스트리밍합니다
// metadata에는 태그를 포함한 LLM 호출에 대한 정보가 포함됩니다
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // metadata의 tags 필드로 스트리밍된 토큰을 필터링하여
  // "joke" 태그가 있는 LLM 호출의 토큰만 포함합니다
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}

노드별 필터링

특정 노드에서만 토큰을 스트리밍하려면, stream_mode="messages"를 사용하고 스트리밍된 메타데이터의 langgraph_node 필드로 출력을 필터링합니다:
// "messages" 스트림 모드는 [messageChunk, metadata] 튜플을 반환합니다
// 여기서 messageChunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
// LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for await (const [msg, metadata] of await graph.stream(
  inputs,
  { streamMode: "messages" }
)) {
  // metadata의 langgraph_node 필드로 스트리밍된 토큰을 필터링하여
  // 지정된 노드의 토큰만 포함합니다
  if (msg.content && metadata.langgraph_node === "some_node_name") {
    // ...
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, START } from "@langchain/langgraph";
import * as z from "zod";

const model = new ChatOpenAI({ model: "gpt-4o-mini" });

const State = z.object({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const graph = new StateGraph(State)
  .addNode("writeJoke", async (state) => {
    const topic = state.topic;
    const jokeResponse = await model.invoke([
      { role: "user", content: `Write a joke about ${topic}` }
    ]);
    return { joke: jokeResponse.content };
  })
  .addNode("writePoem", async (state) => {
    const topic = state.topic;
    const poemResponse = await model.invoke([
      { role: "user", content: `Write a short poem about ${topic}` }
    ]);
    return { poem: poemResponse.content };
  })
  // 농담과 시를 동시에 작성합니다
  .addEdge(START, "writeJoke")
  .addEdge(START, "writePoem")
  .compile();

// "messages" 스트림 모드는 [messageChunk, metadata] 튜플을 반환합니다
// 여기서 messageChunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
// LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // metadata의 langgraph_node 필드로 스트리밍된 토큰을 필터링하여
  // writePoem 노드의 토큰만 포함합니다
  if (msg.content && metadata.langgraph_node === "writePoem") {
    console.log(msg.content + "|");
  }
}

커스텀 데이터 스트리밍

LangGraph 노드 또는 도구 내부에서 커스텀 사용자 정의 데이터를 전송하려면 다음 단계를 따르세요:
  1. LangGraphRunnableConfigwriter 매개변수를 사용하여 커스텀 데이터를 발생시킵니다.
  2. .stream()을 호출할 때 streamMode: "custom"으로 설정하여 스트림에서 커스텀 데이터를 가져옵니다. 여러 모드를 결합할 수 있지만 (예: ["updates", "custom"]), 적어도 하나는 "custom"이어야 합니다.
  • node
  • tool
import { StateGraph, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";

const State = z.object({
  query: z.string(),
  answer: z.string(),
});

const graph = new StateGraph(State)
  .addNode("node", async (state, config) => {
    // 커스텀 키-값 쌍을 발생시키기 위해 writer를 사용합니다 (예: 진행 상황 업데이트)
    config.writer({ custom_key: "Generating custom data inside node" });
    return { answer: "some data" };
  })
  .addEdge(START, "node")
  .compile();

const inputs = { query: "example" };

// 스트림에서 커스텀 데이터를 받으려면 streamMode: "custom"으로 설정합니다
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
  console.log(chunk);
}

모든 LLM과 함께 사용

streamMode: "custom"을 사용하여 모든 LLM API에서 데이터를 스트리밍할 수 있습니다 — 해당 API가 LangChain 채팅 모델 인터페이스를 구현하지 않더라도 말입니다. 이를 통해 자체 스트리밍 인터페이스를 제공하는 원시 LLM 클라이언트 또는 외부 서비스를 통합할 수 있어, LangGraph를 커스텀 설정에 매우 유연하게 사용할 수 있습니다.
import { LangGraphRunnableConfig } from "@langchain/langgraph";

const callArbitraryModel = async (
  state: any,
  config: LangGraphRunnableConfig
) => {
  // 임의의 모델을 호출하고 출력을 스트리밍하는 예제 노드
  // 청크를 생성하는 스트리밍 클라이언트가 있다고 가정합니다
  // 커스텀 스트리밍 클라이언트를 사용하여 LLM 토큰을 생성합니다
  for await (const chunk of yourCustomStreamingClient(state.topic)) {
    // writer를 사용하여 스트림에 커스텀 데이터를 전송합니다
    config.writer({ custom_llm_chunk: chunk });
  }
  return { result: "completed" };
};

const graph = new StateGraph(State)
  .addNode("callArbitraryModel", callArbitraryModel)
  // 필요에 따라 다른 노드와 엣지를 추가합니다
  .compile();

// 스트림에서 커스텀 데이터를 받으려면 streamMode: "custom"으로 설정합니다
for await (const chunk of await graph.stream(
  { topic: "cats" },
  { streamMode: "custom" }
)) {
  // 청크에는 LLM에서 스트리밍된 커스텀 데이터가 포함됩니다
  console.log(chunk);
}
import { StateGraph, START, MessagesZodMeta, LangGraphRunnableConfig } from "@langchain/langgraph";
import { BaseMessage } from "@langchain/core/messages";
import { registry } from "@langchain/langgraph/zod";
import * as z from "zod";
import OpenAI from "openai";

const openaiClient = new OpenAI();
const modelName = "gpt-4o-mini";

async function* streamTokens(modelName: string, messages: any[]) {
  const response = await openaiClient.chat.completions.create({
    messages,
    model: modelName,
    stream: true,
  });

  let role: string | null = null;
  for await (const chunk of response) {
    const delta = chunk.choices[0]?.delta;

    if (delta?.role) {
      role = delta.role;
    }

    if (delta?.content) {
      yield { role, content: delta.content };
    }
  }
}

// 이것은 우리의 도구입니다
const getItems = tool(
  async (input, config: LangGraphRunnableConfig) => {
    let response = "";
    for await (const msgChunk of streamTokens(
      modelName,
      [
        {
          role: "user",
          content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
        },
      ]
    )) {
      response += msgChunk.content;
      config.writer?.(msgChunk);
    }
    return response;
  },
  {
    name: "get_items",
    description: "Use this tool to list items one might find in a place you're asked about.",
    schema: z.object({
      place: z.string().describe("The place to look up items for."),
    }),
  }
);

const State = z.object({
  messages: z
    .array(z.custom<BaseMessage>())
    .register(registry, MessagesZodMeta),
});

const graph = new StateGraph(State)
  // 이것은 도구 호출 그래프 노드입니다
  .addNode("callTool", async (state) => {
    const aiMessage = state.messages.at(-1);
    const toolCall = aiMessage.tool_calls?.at(-1);

    const functionName = toolCall?.function?.name;
    if (functionName !== "get_items") {
      throw new Error(`Tool ${functionName} not supported`);
    }

    const functionArguments = toolCall?.function?.arguments;
    const args = JSON.parse(functionArguments);

    const functionResponse = await getItems.invoke(args);
    const toolMessage = {
      tool_call_id: toolCall.id,
      role: "tool",
      name: functionName,
      content: functionResponse,
    };
    return { messages: [toolMessage] };
  })
  .addEdge(START, "callTool")
  .compile();
도구 호출을 포함하는 AI 메시지로 그래프를 호출해 봅시다:
const inputs = {
  messages: [
    {
      content: null,
      role: "assistant",
      tool_calls: [
        {
          id: "1",
          function: {
            arguments: '{"place":"bedroom"}',
            name: "get_items",
          },
          type: "function",
        }
      ],
    }
  ]
};

for await (const chunk of await graph.stream(
  inputs,
  { streamMode: "custom" }
)) {
  console.log(chunk.content + "|");
}

특정 채팅 모델에 대한 스트리밍 비활성화

애플리케이션에서 스트리밍을 지원하는 모델과 지원하지 않는 모델을 혼합하여 사용하는 경우, 스트리밍을 지원하지 않는 모델에 대해 명시적으로 스트리밍을 비활성화해야 할 수 있습니다. 모델을 초기화할 때 streaming: false로 설정합니다.
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "o1-preview",
  // 채팅 모델에 대한 스트리밍을 비활성화하려면 streaming: false로 설정합니다
  streaming: false,
});

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I