Skip to main content
LangGraph SDK를 사용하면 LangGraph Server API에서 출력을 스트리밍할 수 있습니다.
LangGraph SDK와 LangGraph Server는 LangSmith의 일부입니다.

기본 사용법

기본 사용 예제:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# create a streaming run
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
다음은 LangGraph API 서버에서 실행할 수 있는 예제 그래프입니다. 자세한 내용은 LangSmith 빠른 시작을 참조하세요.
# graph.py
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)
실행 중인 LangGraph API 서버가 있으면, LangGraph SDK를 사용하여 상호작용할 수 있습니다.
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

# create a streaming run
async for chunk in client.runs.stream(  # (1)!
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"  # (2)!
):
    print(chunk.data)
  1. client.runs.stream() 메서드는 스트리밍 출력을 생성하는 이터레이터를 반환합니다. 2. stream_mode="updates"를 설정하면 각 노드 실행 후 그래프 상태의 업데이트만 스트리밍합니다. 다른 스트림 모드도 사용할 수 있습니다. 자세한 내용은 지원되는 스트림 모드를 참조하세요.
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

지원되는 스트림 모드

모드설명LangGraph 라이브러리 메서드
values슈퍼 스텝 후 전체 그래프 상태를 스트리밍합니다.stream_mode="values"와 함께 .stream() / .astream()
updates그래프의 각 단계 후 상태 업데이트를 스트리밍합니다. 동일한 단계에서 여러 업데이트가 발생하는 경우(예: 여러 노드 실행), 해당 업데이트는 개별적으로 스트리밍됩니다.stream_mode="updates"와 함께 .stream() / .astream()
messages-tupleLLM이 호출되는 그래프 노드에 대한 LLM 토큰 및 메타데이터를 스트리밍합니다(채팅 앱에 유용함).stream_mode="messages"와 함께 .stream() / .astream()
debug그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다.stream_mode="debug"와 함께 .stream() / .astream()
custom그래프 내부에서 사용자 정의 데이터를 스트리밍합니다.stream_mode="custom"와 함께 .stream() / .astream()
events그래프 상태를 포함한 모든 이벤트를 스트리밍합니다. 주로 대규모 LCEL 앱을 마이그레이션할 때 유용합니다..astream_events()

여러 모드 스트리밍

stream_mode 매개변수에 리스트를 전달하여 여러 모드를 동시에 스트리밍할 수 있습니다. 스트리밍된 출력은 (mode, chunk) 형태의 튜플로 제공되며, 여기서 mode는 스트림 모드의 이름이고 chunk는 해당 모드에서 스트리밍된 데이터입니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode=["updates", "custom"]
):
    print(chunk)

그래프 상태 스트리밍

스트림 모드 updatesvalues를 사용하여 그래프가 실행될 때 상태를 스트리밍합니다.
  • updates는 그래프의 각 단계 후 상태의 업데이트를 스트리밍합니다.
  • values는 그래프의 각 단계 후 상태의 전체 값을 스트리밍합니다.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
  topic: str
  joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
상태 유지 실행 아래 예제는 스트리밍 실행의 출력을 유지하려는 경우를 가정하며, 체크포인터 DB에 저장하고 스레드를 생성합니다. 스레드를 생성하려면:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"
# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]
실행 출력을 유지할 필요가 없는 경우, 스트리밍할 때 thread_id 대신 None을 전달할 수 있습니다.

스트림 모드: updates

각 단계 후 노드가 반환한 상태 업데이트만 스트리밍하려면 이 모드를 사용합니다. 스트리밍된 출력에는 노드 이름과 업데이트가 포함됩니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"
):
    print(chunk.data)

스트림 모드: values

각 단계 후 그래프의 전체 상태를 스트리밍하려면 이 모드를 사용합니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="values"
):
    print(chunk.data)

서브그래프

스트리밍 출력에 서브그래프의 출력을 포함하려면, 상위 그래프의 .stream() 메서드에서 subgraphs=True를 설정할 수 있습니다. 이렇게 하면 상위 그래프와 모든 서브그래프의 출력을 스트리밍합니다.
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 서브그래프의 출력을 스트리밍하려면 stream_subgraphs=True를 설정합니다.
다음은 LangGraph API 서버에서 실행할 수 있는 예제 그래프입니다. 자세한 내용은 LangSmith 빠른 시작을 참조하세요.
# graph.py
from langgraph.graph import START, StateGraph
from typing import TypedDict

# Define subgraph
class SubgraphState(TypedDict):
    foo: str  # note that this key is shared with the parent graph state
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# Define parent graph
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
실행 중인 LangGraph API 서버가 있으면, LangGraph SDK를 사용하여 상호작용할 수 있습니다.
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# Using the graph deployed with the name "agent"
assistant_id = "agent"

# create a thread
thread = await client.threads.create()
thread_id = thread["thread_id"]

async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 서브그래프의 출력을 스트리밍하려면 stream_subgraphs=True를 설정합니다.
노드 업데이트뿐만 아니라 어떤 그래프(또는 서브그래프)에서 스트리밍되는지를 알려주는 네임스페이스도 수신한다는 점에 유의하세요.

디버깅

debug 스트리밍 모드를 사용하여 그래프 실행 전반에 걸쳐 가능한 한 많은 정보를 스트리밍합니다. 스트리밍된 출력에는 노드 이름과 전체 상태가 포함됩니다.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="debug"
):
    print(chunk.data)

LLM 토큰

messages-tuple 스트리밍 모드를 사용하여 노드, 도구, 서브그래프 또는 태스크를 포함한 그래프의 모든 부분에서 대규모 언어 모델(LLM) 출력을 토큰 단위로 스트리밍합니다. messages-tuple 모드에서 스트리밍된 출력은 (message_chunk, metadata) 튜플입니다:
  • message_chunk: LLM의 토큰 또는 메시지 세그먼트입니다.
  • metadata: 그래프 노드 및 LLM 호출에 대한 세부 정보를 포함하는 딕셔너리입니다.
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START

@dataclass
class MyState:
    topic: str
    joke: str = ""

model = init_chat_model(model="openai:gpt-4o-mini")

def call_model(state: MyState):
    """Call the LLM to generate a joke about a topic"""
    model_response = model.invoke( # (1)!
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)
  1. LLM이 stream이 아닌 invoke를 사용하여 실행되는 경우에도 메시지 이벤트가 발생한다는 점에 유의하세요.
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="messages-tuple",
):
    if chunk.event != "messages":
        continue

    message_chunk, metadata = chunk.data  # (1)!
    if message_chunk["content"]:
        print(message_chunk["content"], end="|", flush=True)
  1. “messages-tuple” 스트림 모드는 (message_chunk, metadata) 튜플의 이터레이터를 반환합니다. 여기서 message_chunk는 LLM이 스트리밍한 토큰이고 metadata는 LLM이 호출된 그래프 노드와 기타 정보에 대한 정보가 포함된 딕셔너리입니다.

LLM 토큰 필터링

사용자 정의 데이터 스트리밍

사용자 정의 데이터를 전송하려면:
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"query": "example"},
    stream_mode="custom"
):
    print(chunk.data)

이벤트 스트리밍

그래프 상태를 포함한 모든 이벤트를 스트리밍하려면:
  • Python
  • JavaScript
  • cURL
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="events"
):
    print(chunk.data)

상태 비저장 실행

스트리밍 실행의 출력을 유지하지 않고 체크포인터 DB에 저장하지 않으려면 스레드를 생성하지 않고 상태 비저장 실행을 만들 수 있습니다:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.stream(
    None,  # (1)!
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
  1. thread_id UUID 대신 None을 전달하고 있습니다.

참여 및 스트리밍

LangSmith를 사용하면 활성 백그라운드 실행에 참여하고 출력을 스트리밍할 수 있습니다. 이를 위해 LangGraph SDKclient.runs.join_stream 메서드를 사용할 수 있습니다:
  • Python
  • JavaScript
  • cURL
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.join_stream(
    thread_id,
    run_id,  # (1)!
):
    print(chunk)
  1. 이것은 참여하려는 기존 실행의 run_id입니다.
출력이 버퍼링되지 않음 .join_stream을 사용하면 출력이 버퍼링되지 않으므로, 참여하기 전에 생성된 출력은 수신되지 않습니다.

API 레퍼런스

API 사용법 및 구현에 대해서는 API 레퍼런스를 참조하세요.
Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I