Skip to main content
LangSmith API를 직접 사용하여 LLM 애플리케이션을 추적하는 방법을 알아봅니다. LangSmith로 트레이스를 전송할 때는 Python 또는 TypeScript SDK를 사용하는 것을 강력히 권장합니다. 이 SDK들은 배치 처리 및 백그라운드 처리와 같은 최적화를 제공하도록 설계되어 있어 애플리케이션의 성능이 LangSmith로 트레이스를 전송하는 과정에서 영향을 받지 않습니다. 하지만 SDK를 사용할 수 없는 경우에는 LangSmith REST API를 사용하여 트레이스를 전송할 수 있습니다. 애플리케이션에서 트레이스를 동기적으로 전송하면 성능에 영향을 미칠 수 있습니다. 이 가이드에서는 LangSmith REST API를 사용하여 요청을 추적하는 방법을 보여줍니다. 전체 엔드포인트 및 요청/응답 스키마 목록은 API 문서를 참조하세요.

기본 추적

실행을 로깅하는 가장 간단한 방법은 POST 및 PATCH /runs 엔드포인트를 사용하는 것입니다. 이 라우트들은 트리 구조에 대한 최소한의 컨텍스트 정보를 요구합니다.
LangSmith REST API를 사용할 때는 요청 헤더에 "x-api-key"로 API 키를 제공해야 합니다.API 키가 여러 워크스페이스에 연결된 경우, 헤더에 "x-tenant-id"로 사용 중인 워크스페이스를 지정해야 합니다.간단한 예제에서는 요청 본문에 dotted_order 또는 trace_id 필드를 설정할 필요가 없습니다. 이 필드들은 시스템에서 자동으로 생성됩니다. 이 방법이 더 간단하지만, LangSmith에서 속도가 느리고 rate limit이 낮습니다.
다음 예제는 Python에서 API를 직접 활용하는 방법을 보여줍니다. 동일한 원칙이 다른 언어에도 적용됩니다.
import openai
import os
import requests
from datetime import datetime, timezone
from uuid import uuid4

# Send your API Key in the request headers
headers = {
    "x-api-key": os.environ["LANGSMITH_API_KEY"],
    "x-tenant-id": os.environ["LANGSMITH_WORKSPACE_ID"]
}

def post_run(run_id, name, run_type, inputs, parent_id=None):
    """Function to post a new run to the API."""
    data = {
        "id": run_id.hex,
        "name": name,
        "run_type": run_type,
        "inputs": inputs,
        "start_time": datetime.utcnow().isoformat(),
        # "session_name": "project-name",  # the name of the project to trace to
        # "session_id": "project-id",  # the ID of the project to trace to. specify one of session_name or session_id
    }
    if parent_id:
        data["parent_run_id"] = parent_id.hex

    requests.post(
        "https://api.smith.langchain.com/runs",  # Update appropriately for self-hosted installations or the EU region
        json=data,
        headers=headers
    )

def patch_run(run_id, outputs):
    """Function to patch a run with outputs."""
    requests.patch(
        f"https://api.smith.langchain.com/runs/{run_id}",
        json={
            "outputs": outputs,
            "end_time": datetime.now(timezone.utc).isoformat(),
        },
        headers=headers,
    )

# This can be a user input to your app
question = "Can you summarize this morning's meetings?"

# This can be retrieved in a retrieval step
context = "During this morning's meeting, we solved all world conflict."

messages = [
    {"role": "system", "content": "You are a helpful assistant. Please respond to the user's request only based on the given context."},
    {"role": "user", "content": f"Question: {question}\nContext: {context}"}
]

# Create parent run
parent_run_id = uuid4()
post_run(parent_run_id, "Chat Pipeline", "chain", {"question": question})

# Create child run
child_run_id = uuid4()
post_run(child_run_id, "OpenAI Call", "llm", {"messages": messages}, parent_run_id)

# Generate a completion
client = openai.Client()
chat_completion = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages
)

# End runs
patch_run(child_run_id, chat_completion.dict())
patch_run(parent_run_id, {"answer": chat_completion.choices[0].message.content})
자세한 내용은 Run (span) 데이터 형식 문서를 참조하세요.

배치 수집

더 빠른 실행 수집과 더 높은 rate limit을 위해 POST /runs/multipart link 엔드포인트를 사용할 수 있습니다. 아래는 예제입니다. 실행하려면 orjson(빠른 JSON 처리용)과 requests_toolbelt가 필요합니다.
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List
import requests
from requests_toolbelt import MultipartEncoder

def create_dotted_order(
    start_time: datetime | None = None,
    run_id: uuid.UUID | None = None
) -> str:
    """Create a dotted order string for run ordering and hierarchy.

    The dotted order is used to establish the sequence and relationships between runs.
    It combines a timestamp with a unique identifier to ensure proper ordering and tracing.
    """
    st = start_time or datetime.now(timezone.utc)
    id_ = run_id or uuid.uuid4()
    return f"{st.strftime('%Y%m%dT%H%M%S%fZ')}{id_}"

def create_run_base(
    name: str,
    run_type: str,
    inputs: dict,
    start_time: datetime
) -> dict:
    """Create the base structure for a run."""
    run_id = uuid.uuid4()
    return {
        "id": str(run_id),
        "trace_id": str(run_id),
        "name": name,
        "start_time": start_time.isoformat(),
        "inputs": inputs,
        "run_type": run_type,
    }

def construct_run(
    name: str,
    run_type: str,
    inputs: dict,
    parent_dotted_order: str | None = None,
) -> dict:
    """Construct a run dictionary with the given parameters.

    This function creates a run with a unique ID and dotted order, establishing its place
    in the trace hierarchy if it's a child run.
    """
    start_time = datetime.now(timezone.utc)
    run = create_run_base(name, run_type, inputs, start_time)
    current_dotted_order = create_dotted_order(start_time, uuid.UUID(run["id"]))

    if parent_dotted_order:
        current_dotted_order = f"{parent_dotted_order}.{current_dotted_order}"
        run["trace_id"] = parent_dotted_order.split(".")[0].split("Z")[1]
        run["parent_run_id"] = parent_dotted_order.split(".")[-1].split("Z")[1]

    run["dotted_order"] = current_dotted_order
    return run

def serialize_run(operation: str, run_data: dict) -> List[tuple]:
    """Serialize a run for the multipart request.

    This function separates the run data into parts for efficient transmission and storage.
    The main run data and optional fields (inputs, outputs, events) are serialized separately.
    """
    run_id = run_data.get("id", str(uuid.uuid4()))

    # Separate optional fields
    inputs = run_data.pop("inputs", None)
    outputs = run_data.pop("outputs", None)
    events = run_data.pop("events", None)

    parts = []

    # Serialize main run data
    run_data_json = json.dumps(run_data).encode("utf-8")
    parts.append(
        (
            f"{operation}.{run_id}",
            (
                None,
                run_data_json,
                "application/json",
                {"Content-Length": str(len(run_data_json))},
            ),
        )
    )

    # Serialize optional fields
    for key, value in [("inputs", inputs), ("outputs", outputs), ("events", events)]:
        if value:
            serialized_value = json.dumps(value).encode("utf-8")
            parts.append(
                (
                    f"{operation}.{run_id}.{key}",
                    (
                        None,
                        serialized_value,
                        "application/json",
                        {"Content-Length": str(len(serialized_value))},
                    ),
                )
            )

    return parts

def batch_ingest_runs(
    api_url: str,
    api_key: str,
    posts: list[dict] | None = None,
    patches: list[dict] | None = None,
) -> None:
    """Ingest multiple runs in a single batch request.

    This function handles both creating new runs (posts) and updating existing runs (patches).
    It's more efficient for ingesting multiple runs compared to individual API calls.
    """
    boundary = uuid.uuid4().hex
    all_parts = []

    for operation, runs in zip(("post", "patch"), (posts, patches)):
        if runs:
            all_parts.extend(
                [part for run in runs for part in serialize_run(operation, run)]
            )

    encoder = MultipartEncoder(fields=all_parts, boundary=boundary)
    headers = {"Content-Type": encoder.content_type, "x-api-key": api_key}

    try:
        response = requests.post(
            f"{api_url}/runs/multipart",
            data=encoder,
            headers=headers
        )
        response.raise_for_status()
        print("Successfully ingested runs.")
    except requests.RequestException as e:
        print(f"Error ingesting runs: {e}")
        # In a production environment, you might want to log this error or handle it more robustly

# Configure API URL and key
# For production use, consider using a configuration file or environment variables
api_url = "https://api.smith.langchain.com"
api_key = os.environ.get("LANGSMITH_API_KEY")

if not api_key:
    raise ValueError("LANGSMITH_API_KEY environment variable is not set")

# Create a parent run
parent_run = construct_run(
    name="Parent Run",
    run_type="chain",
    inputs={"main_question": "Tell me about France"},
)

# Create a child run, linked to the parent
child_run = construct_run(
    name="Child Run",
    run_type="llm",
    inputs={"question": "What is the capital of France?"},
    parent_dotted_order=parent_run["dotted_order"],
)

# First, post the runs to create them
posts = [parent_run, child_run]
batch_ingest_runs(api_url, api_key, posts=posts)

# Then, update the runs with their end times and any outputs
child_run_update = {
    **child_run,
    "end_time": datetime.now(timezone.utc).isoformat(),
    "outputs": {"answer": "Paris is the capital of France."},
}

parent_run_update = {
    **parent_run,
    "end_time": datetime.now(timezone.utc).isoformat(),
    "outputs": {"summary": "Discussion about France, including its capital."},
}

patches = [parent_run_update, child_run_update]
batch_ingest_runs(api_url, api_key, patches=patches)

# Note: This example requires the `requests` and `requests_toolbelt` libraries.
# You can install them using pip:
# pip install requests requests_toolbelt

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I