LangGraph 入门教程:构建智能多步骤 AI 应用的完整指南
前言
随着大型语言模型(LLM)技术的飞速发展,越来越多的开发者开始探索如何将 AI 能力融入到复杂的业务流程中。然而,单纯的"问答式"调用已经无法满足日益复杂的应用需求——我们需要的是能够自主推理、动态决策、多步骤执行的智能系统。
LangGraph 正是在这样的背景下应运而生。作为 LangChain 生态系统中的重要组成部分,LangGraph 提供了一套基于图结构的框架,让开发者能够以直观、灵活的方式构建具备状态管理能力的多代理(Multi-Agent)应用程序。
本文将从零开始,系统性地介绍 LangGraph 的核心概念、安装配置、基础用法,并通过实际案例帮助你快速上手这一强大工具。
一、什么是 LangGraph?
1.1 基本概念
LangGraph 是由 LangChain 团队开发的一个开源框架,专门用于构建有状态的、多角色的 LLM 应用。它的核心思想是将 AI 工作流抽象为一张有向图(Directed Graph),其中:
节点(Node):代表具体的计算单元,可以是 LLM 调用、工具执行、自定义函数等
边(Edge):代表节点之间的流转关系,控制数据和控制流的走向
状态(State):贯穿整个图执行过程的共享数据结构,记录当前工作流的完整上下文
与传统的链式(Chain)调用不同,LangGraph 支持循环(Cycles),这使得它能够实现真正的迭代推理——让 AI 能够"反思"自己的输出,并根据结果决定是否继续执行、调整策略或终止流程。
1.2 为什么需要 LangGraph?
在 LangGraph 出现之前,开发者通常使用以下方式构建 AI 应用:
LangGraph 的出现填补了这一空白,它提供了:
精确的流程控制:开发者可以明确定义每个步骤的执行条件和流转逻辑
完善的状态管理:内置状态持久化机制,支持断点续传和人工干预
可视化支持:工作流图结构天然支持可视化,便于调试和理解
高度可扩展性:支持多代理协作,适合构建复杂的企业级应用
1.3 LangGraph 与 LangChain 的关系
很多初学者会混淆 LangGraph 和 LangChain 的关系。简单来说:
LangChain 提供了与 LLM 交互的基础组件(模型包装、提示模板、文档处理等)
LangGraph 在 LangChain 之上提供了编排层,专注于多步骤工作流的设计与执行
两者并非替代关系,而是相辅相成。LangGraph 内部可以使用 LangChain 的各类组件,而 LangChain 本身也在逐步集成 LangGraph 的能力。
二、核心概念详解
在动手编写代码之前,我们需要深入理解 LangGraph 的几个核心概念。
2.1 状态(State)
状态是 LangGraph 的灵魂。每一个 LangGraph 应用都需要定义一个状态类,它描述了整个工作流运行过程中需要追踪的所有信息。
状态通常使用 Python 的 TypedDict 或 Pydantic 模型来定义:
from typing import TypedDict, Annotated, List
from langgraph.graph import add_messages
class AgentState(TypedDict):
# 消息历史列表,使用 add_messages 注解实现自动追加
messages: Annotated[list, add_messages]
# 当前任务描述
task: str
# 迭代次数计数器
iteration_count: int
# 最终结果
final_result: str
这里有几个关键点需要注意:
Annotated:用于为字段添加额外的元数据,如更新策略
add_messages:LangGraph 内置的归约函数(Reducer),用于将新消息追加到消息列表,而非覆盖
状态的字段可以在不同节点之间共享和修改
2.2 节点(Nodes)
节点是工作流中的执行单元,本质上就是一个 Python 函数,接收当前状态作为输入,返回更新后的状态部分:
def my_node(state: AgentState) -> dict:
# 处理逻辑
result = do_something(state["task"])
# 返回需要更新的状态字段
return {"final_result": result}
节点函数有以下特点:
输入必须是完整的状态对象
输出是一个字典,只需包含需要更新的字段
可以是普通的 Python 函数,也可以是异步函数(
async def)可以包含任意复杂的逻辑:调用 LLM、执行数据库查询、调用外部 API 等
2.3 边(Edges)
边定义了节点之间的流转关系,LangGraph 支持三种类型的边:
普通边(Normal Edge)
直接连接两个节点,无条件流转:
graph.add_edge("node_a", "node_b")
条件边(Conditional Edge)
根据当前状态动态决定下一个节点:
def route_decision(state: AgentState) -> str:
if state["iteration_count"] >= 3:
return "end"
elif state["final_result"]:
return "output_node"
else:
return "retry_node"
graph.add_conditional_edges(
"decision_node",
route_decision,
{
"end": END,
"output_node": "output_node",
"retry_node": "retry_node"
}
)
入口点和结束点
每个图需要明确指定起始节点和终止条件:
graph.set_entry_point("start_node") # 设置入口
graph.add_edge("final_node", END) # 连接到结束点
2.4 图(Graph)
图是 LangGraph 的顶层容器,负责将节点和边组合成一个可执行的工作流:
from langgraph.graph import StateGraph, END
# 创建图,指定状态类型
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("node_a", node_a_function)
workflow.add_node("node_b", node_b_function)
# 添加边
workflow.set_entry_point("node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
# 编译图
app = workflow.compile()
2.5 检查点(Checkpoints)
检查点是 LangGraph 的一个高级特性,用于持久化工作流状态。通过检查点,你可以:
在工作流中途保存状态,支持断点续传
实现"人在回路"(Human-in-the-Loop)模式,允许人工审核和干预
支持工作流的历史回溯和状态查询
from langgraph.checkpoint.memory import MemorySaver
# 创建内存检查点
checkpointer = MemorySaver()
# 编译时传入检查点
app = workflow.compile(checkpointer=checkpointer)
# 执行时传入线程 ID(用于区分不同会话)
config = {"configurable": {"thread_id": "session_001"}}
result = app.invoke(initial_state, config=config)
三、环境安装与配置
3.1 安装依赖
LangGraph 的安装非常简单,使用 pip 即可完成:
# 安装 LangGraph 核心包
pip install langgraph
# 安装 LangChain 及相关依赖
pip install langchain langchain-openai
# 可选:安装 LangGraph CLI(用于可视化和调试)
pip install langgraph-cli
# 可选:安装持久化检查点支持
pip install langgraph-checkpoint-sqlite
3.2 配置 API 密钥
如果你使用 OpenAI 的模型,需要配置相应的 API 密钥:
import os
# 方式一:直接在代码中设置(不推荐用于生产环境)
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
# 方式二:使用 .env 文件(推荐)
from dotenv import load_dotenv
load_dotenv()
创建 .env 文件:
OPENAI_API_KEY=your-openai-api-key
LANGCHAIN_API_KEY=your-langsmith-api-key # 可选,用于追踪
LANGCHAIN_TRACING_V2=true # 可选,启用追踪
3.3 验证安装
运行以下代码验证安装是否成功:
import langgraph
print(f"LangGraph 版本: {langgraph.__version__}")
from langgraph.graph import StateGraph, END
print("LangGraph 安装成功!")
四、快速入门:构建第一个 LangGraph 应用
让我们从一个最简单的例子开始,构建一个能够进行多轮对话的聊天机器人。
4.1 基础聊天机器人
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
# 第一步:定义状态
class ChatState(TypedDict):
messages: Annotated[list, add_messages]
# 第二步:初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
# 第三步:定义节点函数
def chatbot_node(state: ChatState) -> dict:
"""聊天机器人节点:调用 LLM 生成回复"""
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 第四步:构建图
workflow = StateGraph(ChatState)
workflow.add_node("chatbot", chatbot_node)
workflow.set_entry_point("chatbot")
workflow.add_edge("chatbot", END)
# 第五步:编译
app = workflow.compile()
# 第六步:运行
def chat(user_input: str, history: list = None) -> str:
if history is None:
history = []
history.append(HumanMessage(content=user_input))
result = app.invoke({"messages": history})
ai_message = result["messages"][-1]
return ai_message.content
# 测试
response = chat("你好!请介绍一下 LangGraph。")
print(response)
4.2 为聊天机器人添加工具
一个更实用的聊天机器人需要能够使用工具(如搜索、计算等)。让我们扩展上面的例子:
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
# 定义工具
@tool
def calculate(expression: str) -> str:
"""执行数学计算。输入一个数学表达式,返回计算结果。"""
try:
result = eval(expression)
return f"计算结果: {result}"
except Exception as e:
return f"计算错误: {str(e)}"
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气信息(模拟数据)。"""
# 实际应用中这里会调用真实的天气 API
weather_data = {
"北京": "晴天,温度 25°C,湿度 40%",
"上海": "多云,温度 28°C,湿度 65%",
"广州": "小雨,温度 32°C,湿度 80%"
}
return weather_data.get(city, f"暂无 {city} 的天气数据")
# 定义状态
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
# 初始化带工具的 LLM
tools = [calculate, get_weather]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)
# 定义 Agent 节点
def agent_node(state: AgentState) -> dict:
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
# 构建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))
# 设置入口
workflow.set_entry_point("agent")
# 添加条件边:根据 LLM 是否调用工具决定流转方向
workflow.add_conditional_edges(
"agent",
tools_condition, # 内置的条件函数:有工具调用则去 tools,否则结束
)
# 工具执行完后返回 agent 继续处理
workflow.add_edge("tools", "agent")
# 编译
app = workflow.compile()
# 测试
messages = [HumanMessage(content="帮我计算 123 * 456 + 789,并告诉我北京今天的天气")]
result = app.invoke({"messages": messages})
for message in result["messages"]:
print(f"[{type(message).__name__}]: {message.content}")
五、进阶应用:构建 ReAct Agent
ReAct(Reasoning and Acting)是目前最流行的 AI Agent 模式之一,它让 LLM 交替进行推理(Reasoning)和行动(Acting),从而解决复杂问题。
5.1 ReAct Agent 的工作原理
ReAct 的执行流程如下:
思考(Thought)→ 行动(Action)→ 观察(Observation)→ 思考 → ... → 最终答案
思考:LLM 分析当前问题,决定下一步该做什么
行动:执行具体操作(调用工具、查询数据库等)
观察:获取行动的结果
循环:将观察结果加入上下文,继续思考下一步
5.2 用 LangGraph 实现 ReAct Agent
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
import json
# ============ 工具定义 ============
@tool
def search_web(query: str) -> str:
"""搜索互联网获取信息。"""
# 模拟搜索结果
mock_results = {
"LangGraph": "LangGraph 是 LangChain 团队开发的图结构 AI 工作流框架,支持状态管理和循环执行。",
"Python": "Python 是一种高级编程语言,以简洁易读著称,广泛用于 AI、数据科学和 Web 开发。",
}
for key, value in mock_results.items():
if key.lower() in query.lower():
return value
return f"未找到关于 '{query}' 的相关信息"
@tool
def read_file(filename: str) -> str:
"""读取指定文件的内容。"""
try:
with open(filename, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return f"文件 '{filename}' 不存在"
except Exception as e:
return f"读取文件时发生错误: {str(e)}"
@tool
def write_file(filename: str, content: str) -> str:
"""将内容写入指定文件。"""
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
return f"成功将内容写入文件 '{filename}'"
except Exception as e:
return f"写入文件时发生错误: {str(e)}"
# ============ 状态定义 ============
class ReActState(TypedDict):
messages: Annotated[list, add_messages]
step_count: int
max_steps: int
# ============ 节点定义 ============
tools = [search_web, read_file, write_file]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm_with_tools = llm.bind_tools(tools)
SYSTEM_PROMPT = """你是一个有用的 AI 助手,可以使用工具来完成任务。
请按照以下步骤工作:
1. 仔细分析用户的请求
2. 决定是否需要使用工具
3. 如果需要,调用适当的工具获取信息
4. 基于工具返回的结果,继续分析或提供最终答案
5. 确保你的回答准确、完整
可用工具:search_web(搜索信息)、read_file(读取文件)、write_file(写入文件)
"""
def agent_node(state: ReActState) -> dict:
"""Agent 推理节点"""
messages = state["messages"]
# 如果没有系统消息,添加系统提示
if not messages or not isinstance(messages[0], SystemMessage):
messages = [SystemMessage(content=SYSTEM_PROMPT)] + messages
response = llm_with_tools.invoke(messages)
return {
"messages": [response],
"step_count": state.get("step_count", 0) + 1
}
def tool_node(state: ReActState) -> dict:
"""工具执行节点"""
from langchain_core.messages import ToolMessage
last_message = state["messages"][-1]
tool_results = []
# 创建工具映射
tool_map = {t.name: t for t in tools}
# 执行所有工具调用
for tool_call in last_message.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
if tool_name in tool_map:
try:
result = tool_map[tool_name].invoke(tool_args)
tool_results.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
)
)
except Exception as e:
tool_results.append(
ToolMessage(
content=f"工具执行错误: {str(e)}",
tool_call_id=tool_call["id"]
)
)
return {"messages": tool_results}
def should_continue(state: ReActState) -> Literal["tools", "end"]:
"""决策函数:判断是否继续执行工具"""
last_message = state["messages"][-1]
step_count = state.get("step_count", 0)
max_steps = state.get("max_steps", 10)
# 超过最大步数,强制结束
if step_count >= max_steps:
return "end"
# 如果最后一条消息包含工具调用,继续执行
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"
# ============ 构建图 ============
workflow = StateGraph(ReActState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", "end": END}
)
workflow.add_edge("tools", "agent")
react_app = workflow.compile()
# ============ 运行测试 ============
initial_state = {
"messages": [HumanMessage(content="请搜索关于 LangGraph 的信息,并将结果保存到 langgraph_info.txt 文件中")],
"step_count": 0,
"max_steps": 10
}
print("开始执行 ReAct Agent...\n")
for step in react_app.stream(initial_state):
for node_name, node_output in step.items():
print(f"【节点:{node_name}】")
if "messages" in node_output:
last_msg = node_output["messages"][-1]
print(f" 类型: {type(last_msg).__name__}")
if hasattr(last_msg, 'content') and last_msg.content:
print(f" 内容: {last_msg.content[:200]}...")
print()
六、人在回路(Human-in-the-Loop)
在某些关键场景中,我们希望 AI 在执行重要操作之前先获得人工确认。LangGraph 的检查点机制完美支持这一需求。
6.1 实现人工审批流程
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage
class ApprovalState(TypedDict):
messages: Annotated[list, add_messages]
pending_action: str
approved: bool
execution_result: str
def analyze_request(state: ApprovalState) -> dict:
"""分析请求,生成待执行的操作"""
# 这里模拟 LLM 分析
user_message = state["messages"][-1].content
# 假设 LLM 决定需要删除某个文件
pending_action = f"准备执行操作:删除文件 important_data.csv(基于用户请求:{user_message})"
return {
"messages": [AIMessage(content=f"我分析了您的请求,需要执行以下操作:\n{pending_action}\n\n请确认是否继续?")],
"pending_action": pending_action,
"approved": False
}
def execute_action(state: ApprovalState) -> dict:
"""执行已批准的操作"""
if state.get("approved", False):
# 执行实际操作
result = f"操作已成功执行:{state['pending_action']}"
else:
result = "操作已被取消"
return {
"messages": [AIMessage(content=result)],
"execution_result": result
}
def check_approval(state: ApprovalState) -> str:
"""检查是否已获得批准"""
if state.get("approved", False):
return "execute"
else:
return "wait" # 等待人工审批
# 构建图
workflow = StateGraph(ApprovalState)
workflow.add_node("analyze", analyze_request)
workflow.add_node("execute", execute_action)
workflow.set_entry_point("analyze")
workflow.add_conditional_edges(
"analyze",
check_approval,
{"execute": "execute", "wait": END} # wait 时暂停,等待外部输入
)
workflow.add_edge("execute", END)
# 使用检查点编译
checkpointer = MemorySaver()
app = workflow.compile(
checkpointer=checkpointer,
interrupt_before=["execute"] # 在执行节点前中断,等待人工确认
)
# 运行示例
config = {"configurable": {"thread_id": "approval_flow_001"}}
# 第一步:发起请求
print("=== 发起请求 ===")
result = app.invoke(
{
"messages": [HumanMessage(content="请清理过期的数据文件")],
"approved": False,
"pending_action": "",
"execution_result": ""
},
config=config
)
print(f"AI 回复: {result['messages'][-1].content}\n")
# 获取当前状态(此时工作流已暂停)
current_state = app.get_state(config)
print(f"当前待执行操作: {current_state.values.get('pending_action', '无')}\n")
# 第二步:人工审批(更新状态)
print("=== 人工审批:批准操作 ===")
app.update_state(config, {"approved": True})
# 第三步:继续执行
print("=== 继续执行 ===")
final_result = app.invoke(None, config=config)
print(f"执行结果: {final_result['execution_result']}")
七、多代理系统(Multi-Agent System)
对于复杂任务,单个代理往往不够用。LangGraph 支持构建多个专业化代理协同工作的系统。
7.1 主管-工作者模式(Supervisor-Worker Pattern)
from typing import TypedDict, Annotated, List, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import tool
# ============ 专业工具 ============
@tool
def code_analysis(code: str) -> str:
"""分析代码质量和潜在问题"""
# 模拟代码分析
issues = []
if "eval(" in code:
issues.append("安全风险:使用了 eval() 函数")
if len(code.split('\n')) > 100:
issues.append("建议:函数过长,考虑拆分")
if not issues:
return "代码分析完成:未发现明显问题"
return f"代码分析完成,发现以下问题:\n" + "\n".join(f"- {i}" for i in issues)
@tool
def generate_tests(function_name: str, description: str) -> str:
"""为指定函数生成单元测试"""
return f"""
# 为 {function_name} 生成的单元测试
import pytest
class Test{function_name.capitalize()}:
def test_basic_functionality(self):
\"\"\"测试基本功能: {description}\"\"\"
# TODO: 实现测试逻辑
pass
def test_edge_cases(self):
\"\"\"测试边界情况\"\"\"
# TODO: 实现边界测试
pass
def test_error_handling(self):
\"\"\"测试错误处理\"\"\"
# TODO: 实现错误测试
pass
"""
@tool
def write_documentation(function_name: str, description: str, params: str) -> str:
"""生成函数文档"""
return f"""
## {function_name}
**描述**: {description}
**参数**:
{params}
**返回值**: 根据函数逻辑返回相应结果
**使用示例**:
```python
result = {function_name}(...)
注意事项: 请确保输入参数符合预期格式 """
============ 状态定义 ============
class MultiAgentState(TypedDict): messages: Annotated[list, add_messages] task: str current_agent: str results: dict next_step: str
============ 代理定义 ============
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
主管代理
def supervisor_agent(state: MultiAgentState) -> dict: """主管代理:负责任务分解和调度""" supervisor_prompt = """你是一个项目主管,负责协调多个专业代理完成软件开发任务。 你需要决定将任务分配给哪个代理: - code_reviewer: 负责代码审查和质量分析 - test_writer: 负责编写单元测试 - doc_writer: 负责编写文档 - FINISH: 所有任务已完成
请根据当前进展,决定下一步应该由哪个代理处理,并说明原因。
只输出代理名称(code_reviewer/test_writer/doc_writer/FINISH)和简短说明。
"""
messages = [SystemMessage(content=supervisor_prompt)] + state["messages"]
response = llm.invoke(messages)
# 解析主管决策
content = response.content.lower()
if "code_reviewer" in content:
next_agent = "code_reviewer"
elif "test_writer" in content:
next_agent = "test_writer"
elif "doc_writer" in content:
next_agent = "doc_writer"
else:
next_agent = "FINISH"
return {
"messages": [AIMessage(content=f"[主管] {response.content}")],
"next_step": next_agent
}
代码审查代理
def code_reviewer_agent(state: MultiAgentState) -> dict: """代码审查代理""" # 模拟对任务描述中的代码进行审查 sample_code = """ def process_data(data): result = eval(data) # 这行代码有安全风险 return result """
review_result = code_analysis.invoke({"code": sample_code})
return {
"messages": [AIMessage(content=f"[代码审查员] 完成代码审查:\n{review_result}")],
"results": {**state.get("results", {}), "code_review": review_result},
"current_agent": "code_reviewer"
}
测试编写代理
def test_writer_agent(state: MultiAgentState) -> dict: """测试编写代理""" tests = generate_tests.invoke({ "function_name": "process_data", "description": "处理输入数据并返回结果" })
return {
"messages": [AIMessage(content=f"[测试工程师] 已生成单元测试:\n{tests}")],
"results": {**state.get("results", {}), "tests": tests},
"current_agent": "test_writer"
}
文档编写代理
def doc_writer_agent(state: MultiAgentState) -> dict: """文档编写代理""" docs = write_documentation.invoke({ "function_name": "process_data", "description": "处理输入数据并返回处理结果", "params": "- data (str): 待处理的数据字符串" })
return {
"messages": [AIMessage(content=f"[文档工程师] 已生成文档:\n{docs}")],
"results": {**state.get("results", {}), "documentation": docs},
"current_agent": "doc_writer"
}
def route_to_agent(state: MultiAgentState) -> str: """根据主管决策路由到对应代理""" next_step = state.get("next_step", "FINISH")
routing_map = {
"code_reviewer": "code_reviewer",
"test_writer": "test_writer",
"doc_writer": "doc_writer",
"FINISH": END
}
return routing_map.get(next_step, END)
============ 构建多代理图 ============
workflow = StateGraph(MultiAgentState)
添加所有节点
workflow.add_node("supervisor", supervisor_agent) workflow.add_node("code_reviewer", code_reviewer_agent) workflow.add_node("test_writer", test_writer_agent) workflow.add_node("doc_writer", doc_writer_agent)
设置入口
workflow.set_entry_point("supervisor")
主管路由
workflow.add_conditional_edges( "supervisor", route_to_agent, { "code_reviewer": "code_reviewer", "test_writer": "test_writer", "doc_writer": "doc_writer", END: END } )
所有工作者完成后回到主管
for worker in ["code_reviewer", "test_writer", "doc_writer"]: workflow.add_edge(worker, "supervisor")
multi_agent_app = workflow.compile()
============ 运行测试 ============
initial_state = { "messages": [HumanMessage(content="请对 process_data 函数进行代码审查、编写测试用例并生成文档")], "task": "软件开发流程自动化", "current_agent": "", "results": {}, "next_step": "" }
print("启动多代理系统...\n") step_count = 0 for step in multi_agent_app.stream(initial_state): step_count += 1 if step_count > 10: # 防止无限循环 break for node, output in step.items(): if "messages" in output: last_msg = output["messages"][-1] print(f"📍 {node}: {last_msg.content[:300]}") print("-" * 50)
---
## 八、最佳实践与常见问题
### 8.1 状态设计原则
良好的状态设计是 LangGraph 应用成功的基础:
**原则一:最小化状态**
只在状态中存储真正需要跨节点共享的信息,避免状态过于臃肿。
**原则二:使用合适的数据类型**
- 列表类型配合 `Annotated` 和 `add_messages` 等归约函数
- 字典类型用于存储键值对数据
- 简单类型(str, int, bool)用于标志和计数器
**原则三:明确更新语义**
理解 LangGraph 的状态更新机制:默认情况下,返回字典中的值会**覆盖**原有值,而带有归约函数(如 `add_messages`)的字段会**追加或合并**。
### 8.2 错误处理策略
```python
def robust_node(state: AgentState) -> dict:
"""具备错误处理能力的节点"""
try:
result = risky_operation(state)
return {
"messages": [AIMessage(content=result)],
"error": None,
"retry_count": 0
}
except Exception as e:
retry_count = state.get("retry_count", 0)
error_msg = f"执行失败(第 {retry_count + 1} 次尝试): {str(e)}"
return {
"messages": [AIMessage(content=error_msg)],
"error": str(e),
"retry_count": retry_count + 1
}
def handle_error(state: AgentState) -> str:
"""错误路由决策"""
if state.get("error") and state.get("retry_count", 0) < 3:
return "retry"
elif state.get("error"):
return "fallback"
else:
return "success"
8.3 调试技巧
# 技巧一:使用 stream 方法逐步观察执行过程
for step in app.stream(initial_state):
print(f"当前步骤: {list(step.keys())}")
for node, output in step.items():
print(f" 节点 {node} 输出: {output}")
# 技巧二:可视化图结构
from IPython.display import Image
graph_image = app.get_graph().draw_mermaid_png()
with open("graph.png", "wb") as f:
f.write(graph_image)
# 技巧三:打印 Mermaid 图表代码
print(app.get_graph().draw_mermaid())
# 技巧四:查看图的节点和边信息
graph = app.get_graph()
print("节点列表:", list(graph.nodes.keys()))
print("边列表:", [(e.source, e.target) for e in graph.edges])
8.4 性能优化建议
并行执行:对于相互独立的节点,使用
SendAPI 实现并行执行缓存工具结果:对于相同输入的工具调用,使用缓存避免重复执行
合理设置最大步数:始终设置循环执行的上限,防止无限循环
选择合适的 LLM:对于简单决策使用较小的模型,复杂推理才使用大模型
九、实战项目:构建智能代码助手
让我们综合运用所学知识,构建一个完整的智能代码助手应用:
from typing import TypedDict, Annotated, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import tool
import ast
import subprocess
import sys
# ============ 工具定义 ============
@tool
def analyze_code_syntax(code: str) -> str:
"""检查 Python 代码的语法是否正确"""
try:
ast.parse(code)
return "✅ 代码语法检查通过,没有语法错误"
except SyntaxError as e:
return f"❌ 语法错误:第 {e.lineno} 行 - {e.msg}"
@tool
def explain_error(error_message: str) -> str:
"""解释常见的 Python 错误信息"""
error_explanations = {
"NameError": "变量或函数名未定义,请检查变量名是否正确,或是否已经初始化",
"TypeError": "类型错误,操作数的类型不匹配,请检查数据类型",
"IndexError": "列表/数组索引超出范围,请检查索引值是否合法",
"KeyError": "字典中不存在该键,请确认键名是否正确",
"AttributeError": "对象没有该属性或方法,请检查属性名是否正确",
"ImportError": "模块导入失败,请确认模块已安装或路径正确",
}
for error_type, explanation in error_explanations.items():
if error_type in error_message:
return f"错误类型:{error_type}\n解释:{explanation}"
return f"无法识别的错误类型,请提供更多上下文信息"
@tool
def suggest_refactoring(code: str) -> str:
"""提供代码重构建议"""
suggestions = []
lines = code.split('\n')
if len(lines) > 50:
suggestions.append("📋 建议:函数超过 50 行,考虑拆分为多个小函数")
if code.count('for') > 3:
suggestions.append("📋 建议:嵌套循环较多,考虑使用列表推导式或 map/filter")
if 'print(' in code and 'logging' not in code:
suggestions.append("📋 建议:生产代码中使用 logging 模块替代 print 语句")
if not suggestions:
return "✅ 代码结构良好,暂无重构建议"
return "\n".join(suggestions)
# ============ 状态定义 ============
class CodeAssistantState(TypedDict):
messages: Annotated[list, add_messages]
code_context: str
current_issue: str
analysis_result: str
solution_provided: bool
session_id: str
# ============ 节点定义 ============
tools = [analyze_code_syntax, explain_error, suggest_refactoring]
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
llm_with_tools = llm.bind_tools(tools)
ASSISTANT_SYSTEM_PROMPT = """你是一个专业的 Python 代码助手,擅长:
1. 分析和调试代码问题
2. 解释错误信息并提供解决方案
3. 提供代码优化和重构建议
4. 用清晰易懂的方式解释复杂概念
工作方式:
- 首先理解用户的问题
- 使用合适的工具分析代码
- 提供详细的解释和解决方案
- 给出可以直接使用的代码示例
请始终用中文回复,保持专业但友好的语气。
"""
def code_assistant_node(state: CodeAssistantState) -> dict:
"""代码助手主节点"""
messages = [SystemMessage(content=ASSISTANT_SYSTEM_PROMPT)] + state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
def tools_executor_node(state: CodeAssistantState) -> dict:
"""工具执行节点"""
from langchain_core.messages import ToolMessage
last_message = state["messages"][-1]
tool_map = {t.name: t for t in tools}
results = []
for tool_call in last_message.tool_calls:
tool_fn = tool_map.get(tool_call["name"])
if tool_fn:
try:
result = tool_fn.invoke(tool_call["args"])
results.append(ToolMessage(
content=str(result),
tool_call_id=tool_call["id"]
))
except Exception as e:
results.append(ToolMessage(
content=f"工具执行出错: {e}",
tool_call_id=tool_call["id"]
))
return {"messages": results}
def should_use_tools(state: CodeAssistantState) -> str:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "use_tools"
return "respond"
# ============ 构建图 ============
workflow = StateGraph(CodeAssistantState)
workflow.add_node("assistant", code_assistant_node)
workflow.add_node("tools", tools_executor_node)
workflow.set_entry_point("assistant")
workflow.add_conditional_edges(
"assistant",
should_use_tools,
{"use_tools": "tools", "respond": END}
)
workflow.add_edge("tools", "assistant")
checkpointer = MemorySaver()
code_assistant = workflow.compile(checkpointer=checkpointer)
# ============ 交互式运行 ============
def run_code_assistant():
"""运行交互式代码助手"""
session_id = "demo_session_001"
config = {"configurable": {"thread_id": session_id}}
print("=" * 60)
print("🤖 智能代码助手已启动")
print(" 输入 'quit' 退出,输入 'clear' 清空对话历史")
print("=" * 60)
while True:
user_input = input("\n👤 您: ").strip()
if user_input.lower() == 'quit':
print("👋 再见!")
break
if not user_input:
continue
initial_state = {
"messages": [HumanMessage(content=user_input)],
"code_context": "",
"current_issue": "",
"analysis_result": "",
"solution_provided": False,
"session_id": session_id
}
print("\n🤖 助手: ", end="", flush=True)
result = code_assistant.invoke(initial_state, config=config)
last_message = result["messages"][-1]
print(last_message.content)
# 运行助手
# run_code_assistant() # 取消注释以运行交互模式
# 单次测试
test_state = {
"messages": [HumanMessage(content="""
我的代码出现了这个错误:
def calculate_average(numbers): total = sum(numbers) return total / len(numbers)
result = calculate_average([]) print(result)
错误信息:ZeroDivisionError: division by zero
请帮我分析并修复这个问题。
""")],
"code_context": "",
"current_issue": "",
"analysis_result": "",
"solution_provided": False,
"session_id": "test_001"
}
config = {"configurable": {"thread_id": "test_001"}}
result = code_assistant.invoke(test_state, config=config)
print("助手回复:", result["messages"][-1].content)
十、总结与展望
10.1 本文知识点回顾
通过本文的学习,我们系统地掌握了 LangGraph 的核心内容:
10.2 LangGraph 的适用场景
LangGraph 特别适合以下场景:
复杂多步骤 AI 工作流:需要根据中间结果动态调整执行路径
长期运行的 AI 任务:需要状态持久化和断点续传能力
需要人工干预的流程:关键决策点需要人工审核和确认
多专业领域协作:不同领域的专业代理共同完成复杂任务
企业级 AI 应用:需要可观测性、可控性和可审计性
10.3 下一步学习建议
掌握了基础之后,建议进一步探索以下进阶主题:
LangGraph Cloud:学习如何将 LangGraph 应用部署到云端
LangSmith 集成:使用 LangSmith 进行工作流追踪、调试和评估
持久化存储:探索 PostgreSQL、Redis 等生产级检查点方案
流式输出:实现更好的用户体验,支持实时流式响应
自定义归约函数:设计适合业务需求的状态更新策略
子图(SubGraph):将复杂流程模块化,构建可复用的子工作流
10.4 结语
LangGraph 代表了 AI 应用开发的一个重要方向——从简单的"问答"走向复杂的"自主推理与行动"。它将图论的严谨性与 LLM 的智能性完美结合,为开发者提供了一个强大而灵活的工具箱。
随着 AI 技术的持续演进,LangGraph 这类编排框架将在企业 AI 应用中扮演越来越重要的角色。掌握它,不仅是技术能力的提升,更是对未来 AI 应用开发范式的深刻理解。
希望本文能够帮助你快速入门 LangGraph,并在实际项目中发挥它的强大威力。如果你在学习过程中有任何问题,欢迎参考 LangGraph 官方文档 或社区讨论。
本文所有代码示例均基于 LangGraph 最新稳定版本编写,如遇到版本兼容性问题,请以官方文档为准。