什么是LangGraph?
LangGraph是一个专为LLM应用设计的工作流编排框架。其核心原则是:
- 将复杂任务分解为状态和转换
- 管理状态转换逻辑
- 处理任务执行过程中的各种异常
想象一下购物过程:浏览 → 加入购物车 → 结账 → 支付。LangGraph帮助我们高效地管理这样的工作流。
核心概念
1. 状态
状态就像任务执行中的检查点:
from typing import TypedDict, List
class ShoppingState(TypedDict):
# 当前状态
current_step: str
# 购物车商品
cart_items: List[str]
# 总金额
total_amount: float
# 用户输入
user_input: str
class ShoppingGraph(StateGraph):
def __init__(self):
super().__init__()
# 定义状态
self.add_node("browse", self.browse_products)
self.add_node("add_to_cart", self.add_to_cart)
self.add_node("checkout", self.checkout)
self.add_node("payment", self.payment)
2. 状态转换
状态转换定义了任务流程的“路线图”:
class ShoppingController:
def define_transitions(self):
# 添加转换规则
self.graph.add_edge("browse", "add_to_cart")
self.graph.add_edge("add_to_cart", "browse")
self.graph.add_edge("add_to_cart", "checkout")
self.graph.add_edge("checkout", "payment")
def should_move_to_cart(self, state: ShoppingState) -> bool:
"""确定是否应转换到购物车状态"""
return "add to cart" in state["user_input"].lower()
3. 状态持久化
为了确保系统可靠性,我们需要持久化状态信息:
class StateManager:
def __init__(self):
self.redis_client = redis.Redis()
def save_state(self, session_id: str, state: dict):
"""将状态保存到Redis"""
self.redis_client.set(
f"shopping_state:{session_id}",
json.dumps(state),
ex=3600 # 1小时过期
)
def load_state(self, session_id: str) -> dict:
"""从Redis加载状态"""
state_data = self.redis_client.get(f"shopping_state:{session_id}")
return json.loads(state_data) if state_data else None
4. 错误恢复机制
任何步骤都可能失败,我们需要优雅地处理这些情况:
class ErrorHandler:
def __init__(self):
self.max_retries = 3
async def with_retry(self, func, state: dict):
"""带重试机制的函数执行"""
retries = 0
while retries < self.max_retries:
try:
return await func(state)
except Exception as e:
retries += 1
if retries == self.max_retries:
return self.handle_final_error(e, state)
await self.handle_retry(e, state, retries)
def handle_final_error(self, error, state: dict):
"""处理最终错误"""
# 保存错误状态
state["error"] = str(error)
# 回滚到最后一个稳定状态
return self.rollback_to_last_stable_state(state)
实际案例:智能客服系统
让我们来看一个实际例子——智能客服系统:
from langgraph.graph import StateGraph, State
class CustomerServiceState(TypedDict):
conversation_history: List[str]
current_intent: str
user_info: dict
resolved: bool
class CustomerServiceGraph(StateGraph):
def __init__(self):
super().__init__()
# 初始化状态
self.add_node("greeting", self.greet_customer)
self.add_node("understand_intent", self.analyze_intent)
self.add_node("handle_query", self.process_query)
self.add_node("confirm_resolution", self.check_resolution)
async def greet_customer(self, state: State):
"""问候客户"""
response = await self.llm.generate(
prompt=f"""
Conversation history: {state['conversation_history']}
Task: Generate appropriate greeting
Requirements:
1. Maintain professional friendliness
2. Acknowledge returning customers
3. Ask how to help
"""
)
state['conversation_history'].append(f"Assistant: {response}")
return state
async def analyze_intent(self, state: State):
"""理解用户意图"""
response = await self.llm.generate(
prompt=f"""
Conversation history: {state['conversation_history']}
Task: Analyze user intent
Output format:
{{
"intent": "refund/inquiry/complaint/other",
"confidence": 0.95,
"details": "specific description"
}}
"""
)
state['current_intent'] = json.loads(response)
return state
用法
# 初始化系统
graph = CustomerServiceGraph()
state_manager = StateManager()
error_handler = ErrorHandler()
async def handle_customer_query(user_id: str, message: str):
# 加载或创建状态
state = state_manager.load_state(user_id) or {
"conversation_history": [],
"current_intent": None,
"user_info": {},
"resolved": False
}
# 添加用户消息
state["conversation_history"].append(f"User: {message}")
# 执行状态机流程
try:
result = await graph.run(state)
# 保存状态
state_manager.save_state(user_id, result)
return result["conversation_history"][-1]
except Exception as e:
return await error_handler.with_retry(
graph.run,
state
)
最佳实践
- 状态设计原则
- 保持状态简单明了
- 仅存储必要信息
- 考虑序列化要求
- 转换逻辑优化
- 使用条件转换
- 避免无限循环
- 设置最大步骤限制
- 错误处理策略
- 实现优雅降级
- 记录详细信息
- 提供回滚机制
- 性能优化
- 使用异步操作
- 实现状态缓存
- 控制状态大小
常见陷阱及解决方案
- 状态爆炸
- 问题:状态过多导致维护困难
- 解决方案:合并相似状态,使用状态组合而非创建新状态
- 死锁情况
- 问题:循环状态转换导致任务挂起
- 解决方案:添加超时机制和强制退出条件
- 状态一致性
- 问题:分布式环境中的状态不一致
- 解决方案:使用分布式锁和事务机制
总结
LangGraph状态机为管理复杂的AI代理任务流提供了强大的解决方案:
- 清晰的任务流管理
- 可靠的状态持久化
- 全面的错误处理
- 灵活的可扩展性