Skip to main content
권장 읽기 자료이 내용을 살펴보기 전에 다음 문서를 읽어보시면 도움이 됩니다:
대용량 트레이스를 내보내려는 경우, 대량 데이터 내보내기 기능을 사용하는 것을 권장합니다. 이 방법은 대용량 데이터를 더 효과적으로 처리하며, 자동 재시도 및 파티션 간 병렬 처리를 지원합니다.
run(LangSmith 트레이스의 span 데이터)을 쿼리하는 권장 방법은 SDK의 list_runs 메서드나 API의 /runs/query 엔드포인트를 사용하는 것입니다. LangSmith는 트레이스를 Run (span) 데이터 형식에 명시된 간단한 형식으로 저장합니다.

필터 인수 사용하기

간단한 쿼리의 경우, 쿼리 문법에 의존할 필요가 없습니다. 필터 인수 참조에 명시된 필터 인수를 사용할 수 있습니다.
필수 조건아래 코드 스니펫을 실행하기 전에 클라이언트를 초기화해야 합니다.
from langsmith import Client

client = Client()
다음은 키워드 인수를 사용하여 run을 나열하는 몇 가지 예시입니다:

프로젝트의 모든 run 나열하기

project_runs = client.list_runs(project_name="<your_project>")

최근 24시간 내 LLM 및 Chat run 나열하기

todays_llm_runs = client.list_runs(
    project_name="<your_project>",
    start_time=datetime.now() - timedelta(days=1),
    run_type="llm",
)

프로젝트의 루트 run 나열하기

루트 run은 부모가 없는 run입니다. 이러한 run은 is_rootTrue 값이 할당됩니다. 이를 사용하여 루트 run을 필터링할 수 있습니다.
root_runs = client.list_runs(
    project_name="<your_project>",
    is_root=True
)

오류가 없는 run 나열하기

correct_runs = client.list_runs(project_name="<your_project>", error=False)

run ID로 run 나열하기

다른 인수 무시됨위에 설명된 방식으로 run ID 목록을 제공하면 project_name, run_type 등과 같은 다른 모든 필터링 인수가 무시되고 지정된 ID와 일치하는 run만 직접 반환됩니다.
run ID 목록이 있는 경우 직접 나열할 수 있습니다:
run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

필터 쿼리 언어 사용하기

더 복잡한 쿼리의 경우 필터 쿼리 언어 참조에 설명된 쿼리 언어를 사용할 수 있습니다.

대화 스레드의 모든 루트 run 나열하기

이는 대화 스레드에서 run을 가져오는 방법입니다. 스레드 설정에 대한 자세한 내용은 스레드 설정 방법 가이드를 참조하세요. 스레드는 공유 스레드 ID를 설정하여 그룹화됩니다. LangSmith UI에서는 다음 세 가지 메타데이터 키 중 하나를 사용할 수 있습니다: session_id, conversation_id, 또는 thread_id. 세션 ID는 추적 프로젝트 ID라고도 합니다. 다음 쿼리는 이들 중 하나와 일치합니다.
group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
    project_name="<your_project>",
    filter=filter_string,
    is_root=True
)

트레이스의 루트에 “user_score” 피드백 점수 1이 할당된 “extractor”라는 모든 run 나열하기

client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "extractor")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

점수가 4보다 큰 “star_rating” 키를 가진 run 나열하기

client.list_runs(
    project_name="<your_project>",
    filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

완료하는 데 5초 이상 걸린 run 나열하기

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

”error”가 null이 아닌 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

start_time이 특정 타임스탬프보다 큰 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

”substring” 문자열을 포함하는 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='search("substring")')

git 해시 “2aa1cf4”로 태그된 모든 run 나열하기

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

특정 타임스탬프 이후에 시작되고 “error”가 null이 아니거나 “Correctness” 피드백 점수가 0인 모든 run 나열하기

client.list_runs(
  project_name="<your_project>",
  filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

복잡한 쿼리: 태그에 “experimental” 또는 “beta”가 포함되고 지연 시간이 2초보다 큰 모든 run 나열하기

client.list_runs(
  project_name="<your_project>",
  filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

전체 텍스트로 트레이스 트리 검색하기

특정 필드 없이 search() 함수를 사용하여 run의 모든 문자열 필드에 대해 전체 텍스트 검색을 수행할 수 있습니다. 이를 통해 검색어와 일치하는 트레이스를 빠르게 찾을 수 있습니다.
client.list_runs(
  project_name="<your_project>",
  filter='search("image classification")'
)

메타데이터 존재 여부 확인하기

메타데이터의 존재 여부를 확인하려면 eq 연산자를 사용하고, 선택적으로 and 문을 사용하여 값으로 일치시킬 수 있습니다. 이는 run에 대한 보다 구조화된 정보를 기록하려는 경우 유용합니다.
to_search = {
    "user_id": ""
}

# Check for any run with the "user_id" metadata key
client.list_runs(
  project_name="default",
  filter="eq(metadata_key, 'user_id')"
)
# Check for runs with user_id=4070f233-f61e-44eb-bff1-da3c163895a3
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

메타데이터에서 환경 세부 정보 확인하기

메타데이터를 통해 트레이스에 환경 정보를 추가하는 것이 일반적인 패턴입니다. 환경 메타데이터를 포함하는 run을 필터링하려면 위와 동일한 패턴을 사용할 수 있습니다:
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

메타데이터에서 대화 ID 확인하기

동일한 대화의 트레이스를 연결하는 또 다른 일반적인 방법은 공유 대화 ID를 사용하는 것입니다. 이러한 방식으로 대화 ID를 기반으로 run을 필터링하려면 메타데이터에서 해당 ID를 검색할 수 있습니다.
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

키-값 쌍의 부정 필터링

메타데이터, 입력 및 출력 키-값 쌍에 부정 필터링을 사용하여 결과에서 특정 run을 제외할 수 있습니다. 다음은 메타데이터 키-값 쌍에 대한 몇 가지 예시이지만, 동일한 로직이 입력 및 출력 키-값 쌍에도 적용됩니다.
# Find all runs where the metadata does not contain a "conversation_id" key
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'))"
)

# Find all runs where the conversation_id in metadata is not "a1b2c3d4-e5f6-7890"
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where there is no "conversation_id" metadata key and the "a1b2c3d4-e5f6-7890" value is not present
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# Find all runs where the conversation_id metadata key is not present but the "a1b2c3d4-e5f6-7890" value is present
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

여러 필터 결합하기

검색을 세분화하기 위해 여러 조건을 결합하려면 다른 필터링 함수와 함께 and 연산자를 사용할 수 있습니다. 다음은 “ChatOpenAI”라는 이름의 run을 검색하면서 메타데이터에 특정 conversation_id를 가진 run을 찾는 방법입니다:
client.list_runs(
  project_name="default",
  filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

트리 필터

루트 run에 “user_score” 피드백 1이 있고 전체 트레이스의 어느 run이 “ExpandQuery”라는 이름을 가진 경우, “RetrieveDocs”라는 이름의 모든 run을 나열합니다. 이러한 유형의 쿼리는 트레이스 내에서 다양한 상태나 단계에 도달했는지를 조건으로 특정 run을 추출하려는 경우 유용합니다.
client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "RetrieveDocs")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
    tree_filter='eq(name, "ExpandQuery")'
)

고급: 자식 도구 사용을 포함한 평면화된 트레이스 뷰 내보내기

다음 Python 예시는 각 트레이스 내에서 에이전트가 사용한 도구(중첩된 run에서)에 대한 정보를 포함하여 트레이스의 평면화된 뷰를 내보내는 방법을 보여줍니다. 이를 사용하여 여러 트레이스에 걸쳐 에이전트의 동작을 분석할 수 있습니다. 이 예시는 지정된 일수 내의 모든 도구 run을 쿼리하고 부모(루트) run ID별로 그룹화합니다. 그런 다음 각 루트 run에 대한 관련 정보(run 이름, 입력, 출력 등)를 가져와 자식 run 정보와 결합합니다. 쿼리를 최적화하기 위해 이 예시는:
  1. 도구 run을 쿼리할 때 필요한 필드만 선택하여 쿼리 시간을 줄입니다.
  2. 도구 run을 동시에 처리하면서 루트 run을 배치로 가져옵니다.
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# List all tool runs
tool_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="tool",
    # We don't need to fetch inputs, outputs, and other values that # may increase the query time
    select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
    # Group tool runs by parent run ID
    for run in tqdm(tool_runs):
        # Collect all tools invoked within a given trace
        tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
        # maybe send a batch of parent run IDs to the server
        # this lets us query for the root runs in batches
        # while still processing the tool runs
        if len(tool_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(tool_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=["name", "inputs", "outputs", "run_type"],
                    )
                )
    if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = tool_runs_by_parent[root_run.id]
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                "run_type": root_run.run_type,
                "inputs": root_run.inputs,
                "outputs": root_run.outputs,
                "tools_involved": list(root_data["tools_involved"]),
            }
        )

# (Optional): Convert to a pandas DataFrame
import pandas as pd

df = pd.DataFrame(data)
df.head()

고급: 피드백이 있는 트레이스의 리트리버 입출력 내보내기

이 쿼리는 임베딩을 미세 조정하거나 리트리버 동작을 기반으로 엔드투엔드 시스템 성능 문제를 진단하려는 경우 유용합니다. 다음 Python 예시는 특정 피드백 점수를 가진 트레이스 내에서 리트리버 입력 및 출력을 내보내는 방법을 보여줍니다.
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# List all tool runs
retriever_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="retriever",
    # This time we do want to fetch the inputs and outputs, since they
    # may be adjusted by query expansion steps.
    select=["trace_id", "name", "run_type", "inputs", "outputs"],
    trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# Do not exceed rate limit
with ThreadPoolExecutor(max_workers=2) as executor:
    # Group retriever runs by parent run ID
    for run in tqdm(retriever_runs):
        # Collect all retriever calls invoked within a given trace
        for k, v in run.inputs.items():
            retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
        for k, v in (run.outputs or {}).items():
            # Extend the docs
            retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
        # maybe send a batch of parent run IDs to the server
        # this lets us query for the root runs in batches
        # while still processing the retriever runs
        if len(retriever_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(retriever_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=[
                            "name",
                            "inputs",
                            "outputs",
                            "run_type",
                            "feedback_stats",
                        ],
                    )
                )
    if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = retriever_runs_by_parent[root_run.id]
        feedback = {
            f"feedback.{k}": v.get("avg")
            for k, v in (root_run.feedback_stats or {}).items()
        }
        inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
        outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                **inputs,
                **outputs,
                **feedback,
                **root_data,
            }
        )

# (Optional): Convert to a pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

Connect these docs programmatically to Claude, VSCode, and more via MCP for real-time answers.
I