Skip to main content
LangGraph는 실시간 업데이트를 제공하기 위한 스트리밍 시스템을 구현합니다. 스트리밍은 LLM 기반 애플리케이션의 응답성을 향상시키는 데 매우 중요합니다. 완전한 응답이 준비되기 전에도 출력을 점진적으로 표시함으로써, 스트리밍은 특히 LLM의 지연 시간을 처리할 때 사용자 경험(UX)을 크게 개선합니다. LangGraph 스트리밍으로 가능한 것들:

지원되는 스트림 모드

다음 스트림 모드 중 하나 이상을 리스트로 stream() 또는 astream() 메서드에 전달합니다:
모드설명
values그래프의 각 단계 후 상태의 전체 값을 스트리밍합니다.
updates그래프의 각 단계 후 상태에 대한 업데이트를 스트리밍합니다. 동일한 단계에서 여러 업데이트가 발생하면 (예: 여러 노드가 실행됨), 해당 업데이트는 개별적으로 스트리밍됩니다.
custom그래프 노드 내부에서 커스텀 데이터를 스트리밍합니다.
messagesLLM이 호출되는 모든 그래프 노드에서 2-튜플 (LLM 토큰, 메타데이터)을 스트리밍합니다.
debug그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다.

기본 사용 예제

LangGraph 그래프는 스트리밍 출력을 이터레이터로 생성하기 위해 .stream() (동기) 및 .astream() (비동기) 메서드를 제공합니다.
for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)

# stream() 메서드는 스트리밍 출력을 생성하는 이터레이터를 반환합니다
for chunk in graph.stream(
    {"topic": "ice cream"},
    # 각 노드 후 그래프 상태에 대한 업데이트만 스트리밍하려면 stream_mode="updates"로 설정합니다
    # 다른 스트림 모드도 사용할 수 있습니다. 지원되는 스트림 모드를 참조하세요
    stream_mode="updates",
):
    print(chunk)
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}

여러 모드 스트리밍

stream_mode 매개변수에 리스트를 전달하여 여러 모드를 동시에 스트리밍할 수 있습니다. 스트리밍되는 출력은 (mode, chunk) 튜플이 되며, 여기서 mode는 스트림 모드의 이름이고 chunk는 해당 모드에서 스트리밍된 데이터입니다.
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)

그래프 상태 스트리밍

updatesvalues 스트림 모드를 사용하여 그래프 실행 시 상태를 스트리밍합니다.
  • updates는 그래프의 각 단계 후 상태에 대한 업데이트를 스트리밍합니다.
  • values는 그래프의 각 단계 후 상태의 전체 값을 스트리밍합니다.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
  topic: str
  joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
  • updates
  • values
각 단계 후 노드가 반환한 상태 업데이트만 스트리밍하려면 이 모드를 사용합니다. 스트리밍되는 출력에는 노드의 이름과 업데이트가 포함됩니다.
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="updates",
):
    print(chunk)

서브그래프 출력 스트리밍

스트리밍되는 출력에 서브그래프의 출력을 포함하려면, 부모 그래프의 .stream() 메서드에서 subgraphs=True로 설정할 수 있습니다. 이렇게 하면 부모 그래프와 모든 서브그래프의 출력이 스트리밍됩니다. 출력은 (namespace, data) 튜플로 스트리밍되며, 여기서 namespace는 서브그래프가 호출되는 노드의 경로를 가진 튜플입니다. 예: ("parent_node:<task_id>", "child_node:<task_id>")
for chunk in graph.stream(
    {"foo": "foo"},
    # 서브그래프의 출력을 스트리밍하려면 subgraphs=True로 설정합니다
    subgraphs=True,
    stream_mode="updates",
):
    print(chunk)
from langgraph.graph import START, StateGraph
from typing import TypedDict

# 서브그래프 정의
class SubgraphState(TypedDict):
    foo: str  # 이 키는 부모 그래프 상태와 공유됩니다
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# 부모 그래프 정의
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

for chunk in graph.stream(
    {"foo": "foo"},
    stream_mode="updates",
    # 서브그래프의 출력을 스트리밍하려면 subgraphs=True로 설정합니다
    subgraphs=True,
):
    print(chunk)
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
참고 노드 업데이트뿐만 아니라 어떤 그래프(또는 서브그래프)에서 스트리밍하고 있는지 알려주는 네임스페이스도 받고 있습니다.

디버깅

debug 스트리밍 모드를 사용하여 그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다. 스트리밍되는 출력에는 노드의 이름과 전체 상태가 포함됩니다.
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="debug",
):
    print(chunk)

LLM 토큰

messages 스트리밍 모드를 사용하여 노드, 도구, 서브그래프 또는 태스크를 포함한 그래프의 모든 부분에서 대규모 언어 모델(LLM) 출력을 토큰 단위로 스트리밍합니다. messages 모드에서 스트리밍되는 출력은 (message_chunk, metadata) 튜플입니다:
  • message_chunk: LLM에서 나온 토큰 또는 메시지 세그먼트입니다.
  • metadata: 그래프 노드 및 LLM 호출에 대한 세부 정보가 포함된 딕셔너리입니다.
LLM이 LangChain 통합으로 제공되지 않는 경우, custom 모드를 사용하여 출력을 스트리밍할 수 있습니다. 자세한 내용은 모든 LLM과 함께 사용을 참조하세요.
Python < 3.11에서 async 사용 시 수동 구성 필요 Python < 3.11에서 async 코드를 사용할 때는 적절한 스트리밍을 활성화하기 위해 명시적으로 RunnableConfigainvoke()에 전달해야 합니다. 자세한 내용은 Python < 3.11에서 Async를 참조하거나 Python 3.11+로 업그레이드하세요.
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START


@dataclass
class MyState:
    topic: str
    joke: str = ""


llm = init_chat_model(model="openai:gpt-4o-mini")

def call_model(state: MyState):
    """주제에 대한 농담을 생성하기 위해 LLM을 호출합니다"""
    # LLM이 .stream이 아닌 .invoke를 사용하여 실행되더라도 메시지 이벤트가 발생합니다
    llm_response = llm.invoke(
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": llm_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# "messages" 스트림 모드는 (message_chunk, metadata) 튜플의 이터레이터를 반환합니다
# 여기서 message_chunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
# LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for message_chunk, metadata in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)

LLM 호출별 필터링

LLM 호출에 tags를 연결하여 LLM 호출별로 스트리밍된 토큰을 필터링할 수 있습니다.
from langchain.chat_models import init_chat_model

# llm_1은 "joke" 태그가 지정됩니다
llm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke'])
# llm_2는 "poem" 태그가 지정됩니다
llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem'])

graph = ... # 이러한 LLM을 사용하는 그래프를 정의합니다

# stream_mode를 "messages"로 설정하여 LLM 토큰을 스트리밍합니다
# metadata에는 태그를 포함한 LLM 호출에 대한 정보가 포함됩니다
async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",
):
    # metadata의 tags 필드로 스트리밍된 토큰을 필터링하여
    # "joke" 태그가 있는 LLM 호출의 토큰만 포함합니다
    if metadata["tags"] == ["joke"]:
        print(msg.content, end="|", flush=True)
from typing import TypedDict

from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph

# joke_model은 "joke" 태그가 지정됩니다
joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"])
# poem_model은 "poem" 태그가 지정됩니다
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"])


class State(TypedDict):
      topic: str
      joke: str
      poem: str


async def call_model(state, config):
      topic = state["topic"]
      print("Writing joke...")
      # 참고: Python < 3.11에서는 config를 명시적으로 전달해야 합니다
      # 그 이전에는 컨텍스트 변수 지원이 추가되지 않았기 때문입니다: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
      # 컨텍스트 변수가 올바르게 전파되도록 config를 명시적으로 전달합니다
      # Python < 3.11에서 async 코드를 사용할 때 이것이 필요합니다. 자세한 내용은 async 섹션을 참조하세요
      joke_response = await joke_model.ainvoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}],
            config,
      )
      print("\n\nWriting poem...")
      poem_response = await poem_model.ainvoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}],
            config,
      )
      return {"joke": joke_response.content, "poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(call_model)
      .add_edge(START, "call_model")
      .compile()
)

# stream_mode를 "messages"로 설정하여 LLM 토큰을 스트리밍합니다
# metadata에는 태그를 포함한 LLM 호출에 대한 정보가 포함됩니다
async for msg, metadata in graph.astream(
      {"topic": "cats"},
      stream_mode="messages",
):
    if metadata["tags"] == ["joke"]:
        print(msg.content, end="|", flush=True)

노드별 필터링

특정 노드에서만 토큰을 스트리밍하려면, stream_mode="messages"를 사용하고 스트리밍된 메타데이터의 langgraph_node 필드로 출력을 필터링합니다:
# "messages" 스트림 모드는 (message_chunk, metadata) 튜플을 반환합니다
# 여기서 message_chunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
# LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for msg, metadata in graph.stream(
    inputs,
    stream_mode="messages",
):
    # metadata의 langgraph_node 필드로 스트리밍된 토큰을 필터링하여
    # 지정된 노드의 토큰만 포함합니다
    if msg.content and metadata["langgraph_node"] == "some_node_name":
        ...
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")


class State(TypedDict):
      topic: str
      joke: str
      poem: str


def write_joke(state: State):
      topic = state["topic"]
      joke_response = model.invoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}]
      )
      return {"joke": joke_response.content}


def write_poem(state: State):
      topic = state["topic"]
      poem_response = model.invoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}]
      )
      return {"poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(write_joke)
      .add_node(write_poem)
      # 농담과 시를 동시에 작성합니다
      .add_edge(START, "write_joke")
      .add_edge(START, "write_poem")
      .compile()
)

# "messages" 스트림 모드는 (message_chunk, metadata) 튜플을 반환합니다
# 여기서 message_chunk는 LLM이 스트리밍한 토큰이고 metadata는 딕셔너리입니다
# LLM이 호출된 그래프 노드에 대한 정보 및 기타 정보가 포함됩니다
for msg, metadata in graph.stream(
    {"topic": "cats"},
    stream_mode="messages",
):
    # metadata의 langgraph_node 필드로 스트리밍된 토큰을 필터링하여
    # write_poem 노드의 토큰만 포함합니다
    if msg.content and metadata["langgraph_node"] == "write_poem":
        print(msg.content, end="|", flush=True)

커스텀 데이터 스트리밍

LangGraph 노드 또는 도구 내부에서 커스텀 사용자 정의 데이터를 전송하려면 다음 단계를 따르세요:
  1. get_stream_writer()를 사용하여 스트림 라이터에 액세스하고 커스텀 데이터를 발생시킵니다.
  2. .stream() 또는 .astream()을 호출할 때 stream_mode="custom"으로 설정하여 스트림에서 커스텀 데이터를 가져옵니다. 여러 모드를 결합할 수 있지만 (예: ["updates", "custom"]), 적어도 하나는 "custom"이어야 합니다.
Python < 3.11에서 async에서 get_stream_writer() 사용 불가 Python < 3.11에서 실행되는 async 코드에서는 get_stream_writer()가 작동하지 않습니다. 대신, 노드 또는 도구에 writer 매개변수를 추가하고 수동으로 전달하세요. 사용 예제는 Python < 3.11에서 Async를 참조하세요.
  • node
  • tool
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START

class State(TypedDict):
    query: str
    answer: str

def node(state: State):
    # 커스텀 데이터를 전송하기 위해 스트림 라이터를 가져옵니다
    writer = get_stream_writer()
    # 커스텀 키-값 쌍을 발생시킵니다 (예: 진행 상황 업데이트)
    writer({"custom_key": "Generating custom data inside node"})
    return {"answer": "some data"}

graph = (
    StateGraph(State)
    .add_node(node)
    .add_edge(START, "node")
    .compile()
)

inputs = {"query": "example"}

# 스트림에서 커스텀 데이터를 받으려면 stream_mode="custom"으로 설정합니다
for chunk in graph.stream(inputs, stream_mode="custom"):
    print(chunk)

모든 LLM과 함께 사용

stream_mode="custom"을 사용하여 모든 LLM API에서 데이터를 스트리밍할 수 있습니다 — 해당 API가 LangChain 채팅 모델 인터페이스를 구현하지 않더라도 말입니다. 이를 통해 자체 스트리밍 인터페이스를 제공하는 원시 LLM 클라이언트 또는 외부 서비스를 통합할 수 있어, LangGraph를 커스텀 설정에 매우 유연하게 사용할 수 있습니다.
from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
    """임의의 모델을 호출하고 출력을 스트리밍하는 예제 노드"""
    # 커스텀 데이터를 전송하기 위해 스트림 라이터를 가져옵니다
    writer = get_stream_writer()
    # 청크를 생성하는 스트리밍 클라이언트가 있다고 가정합니다
    # 커스텀 스트리밍 클라이언트를 사용하여 LLM 토큰을 생성합니다
    for chunk in your_custom_streaming_client(state["topic"]):
        # writer를 사용하여 스트림에 커스텀 데이터를 전송합니다
        writer({"custom_llm_chunk": chunk})
    return {"result": "completed"}

graph = (
    StateGraph(State)
    .add_node(call_arbitrary_model)
    # 필요에 따라 다른 노드와 엣지를 추가합니다
    .compile()
)

# 스트림에서 커스텀 데이터를 받으려면 stream_mode="custom"으로 설정합니다
for chunk in graph.stream(
    {"topic": "cats"},
    stream_mode="custom",
):
    # 청크에는 LLM에서 스트리밍된 커스텀 데이터가 포함됩니다
    print(chunk)
import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# 이것은 우리의 도구입니다
async def get_items(place: str) -> str:
    """질문한 장소에서 찾을 수 있는 항목을 나열하기 위해 이 도구를 사용합니다."""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# 이것은 도구 호출 그래프 노드입니다
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)
도구 호출을 포함하는 AI 메시지로 그래프를 호출해 봅시다:
inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)

특정 채팅 모델에 대한 스트리밍 비활성화

애플리케이션에서 스트리밍을 지원하는 모델과 지원하지 않는 모델을 혼합하여 사용하는 경우, 스트리밍을 지원하지 않는 모델에 대해 명시적으로 스트리밍을 비활성화해야 할 수 있습니다. 모델을 초기화할 때 disable_streaming=True로 설정합니다.
  • init_chat_model
  • chat model interface
from langchain.chat_models import init_chat_model

model = init_chat_model(
    "anthropic:claude-3-7-sonnet-latest",
    # 채팅 모델에 대한 스트리밍을 비활성화하려면 disable_streaming=True로 설정합니다
    disable_streaming=True
)

Python < 3.11에서 Async

Python 버전 < 3.11에서는 asyncio 태스크context 매개변수를 지원하지 않습니다. 이는 LangGraph가 컨텍스트를 자동으로 전파하는 기능을 제한하며, LangGraph의 스트리밍 메커니즘에 두 가지 주요 방식으로 영향을 미칩니다:
  1. 콜백이 자동으로 전파되지 않으므로, async LLM 호출(예: ainvoke())에 명시적으로 RunnableConfig를 전달해야 합니다.
  2. async 노드 또는 도구에서 get_stream_writer()를 사용할 수 없습니다writer 인수를 직접 전달해야 합니다.
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model

llm = init_chat_model(model="openai:gpt-4o-mini")

class State(TypedDict):
    topic: str
    joke: str

# async 노드 함수에서 config를 인수로 받습니다
async def call_model(state, config):
    topic = state["topic"]
    print("Generating joke...")
    # 적절한 컨텍스트 전파를 보장하기 위해 llm.ainvoke()에 config를 전달합니다
    joke_response = await llm.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    return {"joke": joke_response.content}

graph = (
    StateGraph(State)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# LLM 토큰을 스트리밍하려면 stream_mode="messages"로 설정합니다
async for chunk, metadata in graph.astream(
    {"topic": "ice cream"},
    stream_mode="messages",
):
    if chunk.content:
        print(chunk.content, end="|", flush=True)
from typing import TypedDict
from langgraph.types import StreamWriter

class State(TypedDict):
      topic: str
      joke: str

# async 노드 또는 도구의 함수 시그니처에 writer를 인수로 추가합니다
# LangGraph는 자동으로 스트림 라이터를 함수에 전달합니다
async def generate_joke(state: State, writer: StreamWriter):
      writer({"custom_key": "Streaming custom data while generating a joke"})
      return {"joke": f"This is a joke about {state['topic']}"}

graph = (
      StateGraph(State)
      .add_node(generate_joke)
      .add_edge(START, "generate_joke")
      .compile()
)

# 스트림에서 커스텀 데이터를 받으려면 stream_mode="custom"으로 설정합니다
async for chunk in graph.astream(
      {"topic": "ice cream"},
      stream_mode="custom",
):
      print(chunk)

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I