고객 지원 이메일 에이전트를 개별 단계로 분해하여 LangGraph로 에이전트를 구축하는 사고 방식을 배웁니다
LangGraph는 여러분이 구축하는 에이전트에 대한 사고 방식을 바꿀 수 있습니다. LangGraph로 에이전트를 구축할 때는 먼저 노드라고 불리는 개별 단계로 분해합니다. 그런 다음 각 노드에 대한 다양한 의사 결정과 전환을 설명합니다. 마지막으로 각 노드가 읽고 쓸 수 있는 공유 상태를 통해 노드들을 연결합니다. 이 튜토리얼에서는 LangGraph로 고객 지원 이메일 에이전트를 구축하는 사고 과정을 안내합니다.
프로세스에서 구별되는 단계들을 식별하는 것부터 시작합니다. 각 단계는 노드(특정한 하나의 작업을 수행하는 함수)가 됩니다. 그런 다음 이러한 단계들이 서로 어떻게 연결되는지 스케치합니다.화살표는 가능한 경로를 보여주지만, 어떤 경로를 선택할지에 대한 실제 결정은 각 노드 내부에서 발생합니다.이제 워크플로의 구성 요소를 식별했으므로, 각 노드가 수행해야 하는 작업을 이해해 봅시다:
Read Email: 이메일 콘텐츠 추출 및 파싱
Classify Intent: LLM을 사용하여 긴급도와 주제를 분류한 다음 적절한 작업으로 라우팅
Doc Search: 지식 베이스에서 관련 정보 쿼리
Bug Track: 추적 시스템에서 이슈 생성 또는 업데이트
Draft Reply: 적절한 응답 생성
Human Review: 승인 또는 처리를 위해 담당자에게 에스컬레이션
Send Reply: 이메일 응답 발송
일부 노드는 다음에 어디로 갈지 결정하고(Classify Intent, Draft Reply, Human Review), 다른 노드는 항상 동일한 다음 단계로 진행합니다(Read Email은 항상 Classify Intent로 이동하고, Doc Search는 항상 Draft Reply로 이동).
각 노드를 간단한 함수로 구현하겠습니다. 기억하세요: 노드는 상태를 받아 작업을 수행하고 업데이트를 반환합니다.
읽기 및 분류 노드
Copy
from typing import Literalfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.types import interrupt, Command, RetryPolicyfrom langchain_openai import ChatOpenAIfrom langchain_core.messages import HumanMessagellm = ChatOpenAI(model="gpt-4")def read_email(state: EmailAgentState) -> dict: """Extract and parse email content""" # In production, this would connect to your email service return { "messages": [HumanMessage(content=f"Processing email: {state['email_content']}")] }def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]: """Use LLM to classify email intent and urgency, then route accordingly""" # Create structured LLM that returns EmailClassification dict structured_llm = llm.with_structured_output(EmailClassification) # Format the prompt on-demand, not stored in state classification_prompt = f""" Analyze this customer email and classify it: Email: {state['email_content']} From: {state['sender_email']} Provide classification including intent, urgency, topic, and summary. """ # Get structured response directly as dict classification = structured_llm.invoke(classification_prompt) # Determine next node based on classification if classification['intent'] == 'billing' or classification['urgency'] == 'critical': goto = "human_review" elif classification['intent'] in ['question', 'feature']: goto = "search_documentation" elif classification['intent'] == 'bug': goto = "bug_tracking" else: goto = "draft_response" # Store classification as a single dict in state return Command( update={"classification": classification}, goto=goto )
검색 및 추적 노드
Copy
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]: """Search knowledge base for relevant information""" # Build search query from classification classification = state.get('classification', {}) query = f"{classification.get('intent', '')} {classification.get('topic', '')}" try: # Implement your search logic here # Store raw search results, not formatted text search_results = [ "Reset password via Settings > Security > Change Password", "Password must be at least 12 characters", "Include uppercase, lowercase, numbers, and symbols" ] except SearchAPIError as e: # For recoverable search errors, store error and continue search_results = [f"Search temporarily unavailable: {str(e)}"] return Command( update={"search_results": search_results}, # Store raw results or error goto="draft_response" )def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]: """Create or update bug tracking ticket""" # Create ticket in your bug tracking system ticket_id = "BUG-12345" # Would be created via API return Command( update={ "search_results": [f"Bug ticket {ticket_id} created"], "current_step": "bug_tracked" }, goto="draft_response" )
응답 노드
Copy
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]: """Generate response using context and route based on quality""" classification = state.get('classification', {}) # Format context from raw state data on-demand context_sections = [] if state.get('search_results'): # Format search results for the prompt formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']]) context_sections.append(f"Relevant documentation:\n{formatted_docs}") if state.get('customer_history'): # Format customer data for the prompt context_sections.append(f"Customer tier: {state['customer_history'].get('tier', 'standard')}") # Build the prompt with formatted context draft_prompt = f""" Draft a response to this customer email: {state['email_content']} Email intent: {classification.get('intent', 'unknown')} Urgency level: {classification.get('urgency', 'medium')} {chr(10).join(context_sections)} Guidelines: - Be professional and helpful - Address their specific concern - Use the provided documentation when relevant """ response = llm.invoke(draft_prompt) # Determine if human review needed based on urgency and intent needs_review = ( classification.get('urgency') in ['high', 'critical'] or classification.get('intent') == 'complex' ) # Route to appropriate next node goto = "human_review" if needs_review else "send_reply" return Command( update={"draft_response": response.content}, # Store only the raw response goto=goto )def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]: """Pause for human review using interrupt and route based on decision""" classification = state.get('classification', {}) # interrupt() must come first - any code before it will re-run on resume human_decision = interrupt({ "email_id": state.get('email_id',''), "original_email": state.get('email_content',''), "draft_response": state.get('draft_response',''), "urgency": classification.get('urgency'), "intent": classification.get('intent'), "action": "Please review and approve/edit this response" }) # Now process the human's decision if human_decision.get("approved"): return Command( update={"draft_response": human_decision.get("edited_response", state.get('draft_response',''))}, goto="send_reply" ) else: # Rejection means human will handle directly return Command(update={}, goto=END)def send_reply(state: EmailAgentState) -> dict: """Send the email response""" # Integrate with email service print(f"Sending reply: {state['draft_response'][:100]}...") return {}
이제 노드들을 작동하는 그래프로 연결합니다. 노드들이 자체 라우팅 결정을 처리하므로, 몇 가지 필수 에지만 필요합니다.interrupt()로 human-in-the-loop를 활성화하려면 실행 간 상태를 저장하기 위한 checkpointer로 컴파일해야 합니다:
그래프 컴파일 코드
Copy
from langgraph.checkpoint.memory import MemorySaverfrom langgraph.types import RetryPolicy# Create the graphworkflow = StateGraph(EmailAgentState)# Add nodes with appropriate error handlingworkflow.add_node("read_email", read_email)workflow.add_node("classify_intent", classify_intent)# Add retry policy for nodes that might have transient failuresworkflow.add_node( "search_documentation", search_documentation, retry_policy=RetryPolicy(max_attempts=3))workflow.add_node("bug_tracking", bug_tracking)workflow.add_node("draft_response", draft_response)workflow.add_node("human_review", human_review)workflow.add_node("send_reply", send_reply)# Add only the essential edgesworkflow.add_edge(START, "read_email")workflow.add_edge("read_email", "classify_intent")workflow.add_edge("send_reply", END)# Compile with checkpointer for persistence, in case run graph with Local_Server --> Please compile without checkpointermemory = MemorySaver()app = workflow.compile(checkpointer=memory)
그래프 구조는 최소화되어 있습니다. 라우팅이 Command 객체를 통해 노드 내부에서 발생하기 때문입니다. 각 노드는 Command[Literal["node1", "node2"]]와 같은 타입 힌트를 사용하여 이동할 수 있는 위치를 선언하므로, 흐름이 명시적이고 추적 가능합니다.
# Test with an urgent billing issueinitial_state = { "email_content": "I was charged twice for my subscription! This is urgent!", "sender_email": "[email protected]", "email_id": "email_123", "messages": []}# Run with a thread_id for persistenceconfig = {"configurable": {"thread_id": "customer_123"}}result = app.invoke(initial_state, config)# The graph will pause at human_reviewprint(f"Draft ready for review: {result['draft_response'][:100]}...")# When ready, provide human input to resumefrom langgraph.types import Commandhuman_response = Command( resume={ "approved": True, "edited_response": "We sincerely apologize for the double charge. I've initiated an immediate refund..." })# Resume executionfinal_result = app.invoke(human_response, config)print(f"Email sent successfully!")
그래프는 interrupt()에 도달하면 일시 중지되고, 모든 것을 checkpointer에 저장한 후 대기합니다. 며칠 후에도 재개할 수 있으며, 중단된 정확한 지점에서 다시 시작합니다. thread_id는 이 대화에 대한 모든 상태가 함께 보존되도록 보장합니다.
이 섹션에서는 노드 세분화 설계의 트레이드오프를 살펴봅니다. 대부분의 애플리케이션은 이를 건너뛰고 위에 표시된 패턴을 사용할 수 있습니다.
궁금할 수 있습니다: Read Email과 Classify Intent를 하나의 노드로 결합하면 안 될까요?또는 Doc Search를 Draft Reply에서 분리하는 이유는 무엇일까요?답은 복원력과 관찰 가능성 간의 트레이드오프와 관련이 있습니다.복원력 고려사항: LangGraph의 내구성 있는 실행은 노드 경계에서 체크포인트를 생성합니다. 워크플로가 중단 또는 실패 후 재개될 때, 실행이 중지된 노드의 시작 부분에서 시작합니다. 노드가 작을수록 체크포인트가 더 자주 생성되며, 이는 문제가 발생했을 때 반복할 작업이 줄어듭니다. 여러 작업을 하나의 큰 노드로 결합하면, 끝 부분 근처에서 실패했을 때 해당 노드의 시작 부분부터 모든 것을 다시 실행해야 합니다.이메일 에이전트에 대해 이러한 분해를 선택한 이유:
외부 서비스 격리: Doc Search와 Bug Track은 외부 API를 호출하기 때문에 별도의 노드입니다. 검색 서비스가 느리거나 실패하면 LLM 호출과 격리하고 싶습니다. 다른 노드에 영향을 주지 않고 이러한 특정 노드에 재시도 정책을 추가할 수 있습니다.
중간 가시성:Classify Intent를 자체 노드로 두면 조치를 취하기 전에 LLM이 결정한 내용을 검사할 수 있습니다. 이는 디버깅 및 모니터링에 유용합니다 - 에이전트가 언제 어떤 이유로 담당자 검토로 라우팅하는지 정확히 볼 수 있습니다.
서로 다른 실패 모드: LLM 호출, 데이터베이스 조회, 이메일 전송은 서로 다른 재시도 전략을 가집니다. 별도의 노드를 사용하면 이를 독립적으로 구성할 수 있습니다.
재사용성 및 테스트: 작은 노드는 격리된 상태에서 테스트하고 다른 워크플로에서 재사용하기 쉽습니다.
다른 유효한 접근 방식: Read Email과 Classify Intent를 단일 노드로 결합할 수 있습니다. 분류 전 원시 이메일을 검사하는 기능을 잃고 해당 노드의 모든 실패 시 두 작업을 모두 반복하게 됩니다. 대부분의 애플리케이션에서 별도 노드의 관찰 가능성과 디버깅 이점이 트레이드오프의 가치가 있습니다.애플리케이션 수준 고려사항: 단계 2의 캐싱 논의(검색 결과를 캐싱할지 여부)는 LangGraph 프레임워크 기능이 아닌 애플리케이션 수준 결정입니다. 특정 요구사항에 따라 노드 함수 내에서 캐싱을 구현합니다 - LangGraph는 이를 규정하지 않습니다.성능 고려사항: 노드가 많다고 해서 실행이 느려지는 것은 아닙니다. LangGraph는 기본적으로 백그라운드에서 체크포인트를 작성하므로(비동기 내구성 모드), 그래프가 체크포인트 완료를 기다리지 않고 계속 실행됩니다. 이는 최소한의 성능 영향으로 빈번한 체크포인트를 얻는다는 것을 의미합니다. 필요한 경우 이 동작을 조정할 수 있습니다 - 완료 시에만 체크포인트하려면 "exit" 모드를 사용하거나, 각 체크포인트가 작성될 때까지 실행을 차단하려면 "sync" 모드를 사용합니다.