前言

随着大型语言模型(LLM)技术的飞速发展,越来越多的开发者开始探索如何将 AI 能力融入到复杂的业务流程中。然而,单纯的"问答式"调用已经无法满足日益复杂的应用需求——我们需要的是能够自主推理、动态决策、多步骤执行的智能系统。

LangGraph 正是在这样的背景下应运而生。作为 LangChain 生态系统中的重要组成部分,LangGraph 提供了一套基于图结构的框架,让开发者能够以直观、灵活的方式构建具备状态管理能力的多代理(Multi-Agent)应用程序。

本文将从零开始,系统性地介绍 LangGraph 的核心概念、安装配置、基础用法,并通过实际案例帮助你快速上手这一强大工具。


一、什么是 LangGraph?

1.1 基本概念

LangGraph 是由 LangChain 团队开发的一个开源框架,专门用于构建有状态的、多角色的 LLM 应用。它的核心思想是将 AI 工作流抽象为一张有向图(Directed Graph),其中:

  • 节点(Node):代表具体的计算单元,可以是 LLM 调用、工具执行、自定义函数等

  • 边(Edge):代表节点之间的流转关系,控制数据和控制流的走向

  • 状态(State):贯穿整个图执行过程的共享数据结构,记录当前工作流的完整上下文

与传统的链式(Chain)调用不同,LangGraph 支持循环(Cycles),这使得它能够实现真正的迭代推理——让 AI 能够"反思"自己的输出,并根据结果决定是否继续执行、调整策略或终止流程。

1.2 为什么需要 LangGraph?

在 LangGraph 出现之前,开发者通常使用以下方式构建 AI 应用:

方式

优点

局限性

简单 LLM 调用

实现简单、快速

无法处理复杂多步骤逻辑

LangChain Chains

链式组合灵活

不支持循环,状态管理困难

LangChain Agents

支持工具调用

可控性差,调试困难

自定义代码

完全可控

开发成本高,缺乏标准化

LangGraph 的出现填补了这一空白,它提供了:

  1. 精确的流程控制:开发者可以明确定义每个步骤的执行条件和流转逻辑

  2. 完善的状态管理:内置状态持久化机制,支持断点续传和人工干预

  3. 可视化支持:工作流图结构天然支持可视化,便于调试和理解

  4. 高度可扩展性:支持多代理协作,适合构建复杂的企业级应用

1.3 LangGraph 与 LangChain 的关系

很多初学者会混淆 LangGraph 和 LangChain 的关系。简单来说:

  • LangChain 提供了与 LLM 交互的基础组件(模型包装、提示模板、文档处理等)

  • LangGraph 在 LangChain 之上提供了编排层,专注于多步骤工作流的设计与执行

两者并非替代关系,而是相辅相成。LangGraph 内部可以使用 LangChain 的各类组件,而 LangChain 本身也在逐步集成 LangGraph 的能力。


二、核心概念详解

在动手编写代码之前,我们需要深入理解 LangGraph 的几个核心概念。

2.1 状态(State)

状态是 LangGraph 的灵魂。每一个 LangGraph 应用都需要定义一个状态类,它描述了整个工作流运行过程中需要追踪的所有信息。

状态通常使用 Python 的 TypedDict 或 Pydantic 模型来定义:

from typing import TypedDict, Annotated, List
from langgraph.graph import add_messages

class AgentState(TypedDict):
    # 消息历史列表,使用 add_messages 注解实现自动追加
    messages: Annotated[list, add_messages]
    # 当前任务描述
    task: str
    # 迭代次数计数器
    iteration_count: int
    # 最终结果
    final_result: str

这里有几个关键点需要注意:

  • Annotated:用于为字段添加额外的元数据,如更新策略

  • add_messages:LangGraph 内置的归约函数(Reducer),用于将新消息追加到消息列表,而非覆盖

  • 状态的字段可以在不同节点之间共享和修改

2.2 节点(Nodes)

节点是工作流中的执行单元,本质上就是一个 Python 函数,接收当前状态作为输入,返回更新后的状态部分:

def my_node(state: AgentState) -> dict:
    # 处理逻辑
    result = do_something(state["task"])
    # 返回需要更新的状态字段
    return {"final_result": result}

节点函数有以下特点:

  1. 输入必须是完整的状态对象

  2. 输出是一个字典,只需包含需要更新的字段

  3. 可以是普通的 Python 函数,也可以是异步函数(async def

  4. 可以包含任意复杂的逻辑:调用 LLM、执行数据库查询、调用外部 API 等

2.3 边(Edges)

边定义了节点之间的流转关系,LangGraph 支持三种类型的边:

普通边(Normal Edge)

直接连接两个节点,无条件流转:

graph.add_edge("node_a", "node_b")

条件边(Conditional Edge)

根据当前状态动态决定下一个节点:

def route_decision(state: AgentState) -> str:
    if state["iteration_count"] >= 3:
        return "end"
    elif state["final_result"]:
        return "output_node"
    else:
        return "retry_node"

graph.add_conditional_edges(
    "decision_node",
    route_decision,
    {
        "end": END,
        "output_node": "output_node",
        "retry_node": "retry_node"
    }
)

入口点和结束点

每个图需要明确指定起始节点和终止条件:

graph.set_entry_point("start_node")  # 设置入口
graph.add_edge("final_node", END)    # 连接到结束点

2.4 图(Graph)

图是 LangGraph 的顶层容器,负责将节点和边组合成一个可执行的工作流:

from langgraph.graph import StateGraph, END

# 创建图,指定状态类型
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("node_a", node_a_function)
workflow.add_node("node_b", node_b_function)

# 添加边
workflow.set_entry_point("node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

# 编译图
app = workflow.compile()

2.5 检查点(Checkpoints)

检查点是 LangGraph 的一个高级特性,用于持久化工作流状态。通过检查点,你可以:

  • 在工作流中途保存状态,支持断点续传

  • 实现"人在回路"(Human-in-the-Loop)模式,允许人工审核和干预

  • 支持工作流的历史回溯和状态查询

from langgraph.checkpoint.memory import MemorySaver

# 创建内存检查点
checkpointer = MemorySaver()

# 编译时传入检查点
app = workflow.compile(checkpointer=checkpointer)

# 执行时传入线程 ID(用于区分不同会话)
config = {"configurable": {"thread_id": "session_001"}}
result = app.invoke(initial_state, config=config)

三、环境安装与配置

3.1 安装依赖

LangGraph 的安装非常简单,使用 pip 即可完成:

# 安装 LangGraph 核心包
pip install langgraph

# 安装 LangChain 及相关依赖
pip install langchain langchain-openai

# 可选:安装 LangGraph CLI(用于可视化和调试)
pip install langgraph-cli

# 可选:安装持久化检查点支持
pip install langgraph-checkpoint-sqlite

3.2 配置 API 密钥

如果你使用 OpenAI 的模型,需要配置相应的 API 密钥:

import os

# 方式一:直接在代码中设置(不推荐用于生产环境)
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# 方式二:使用 .env 文件(推荐)
from dotenv import load_dotenv
load_dotenv()

创建 .env 文件:

OPENAI_API_KEY=your-openai-api-key
LANGCHAIN_API_KEY=your-langsmith-api-key  # 可选,用于追踪
LANGCHAIN_TRACING_V2=true                 # 可选,启用追踪

3.3 验证安装

运行以下代码验证安装是否成功:

import langgraph
print(f"LangGraph 版本: {langgraph.__version__}")

from langgraph.graph import StateGraph, END
print("LangGraph 安装成功!")

四、快速入门:构建第一个 LangGraph 应用

让我们从一个最简单的例子开始,构建一个能够进行多轮对话的聊天机器人。

4.1 基础聊天机器人

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage

# 第一步:定义状态
class ChatState(TypedDict):
    messages: Annotated[list, add_messages]

# 第二步:初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

# 第三步:定义节点函数
def chatbot_node(state: ChatState) -> dict:
    """聊天机器人节点:调用 LLM 生成回复"""
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# 第四步:构建图
workflow = StateGraph(ChatState)
workflow.add_node("chatbot", chatbot_node)
workflow.set_entry_point("chatbot")
workflow.add_edge("chatbot", END)

# 第五步:编译
app = workflow.compile()

# 第六步:运行
def chat(user_input: str, history: list = None) -> str:
    if history is None:
        history = []
    
    history.append(HumanMessage(content=user_input))
    
    result = app.invoke({"messages": history})
    
    ai_message = result["messages"][-1]
    return ai_message.content

# 测试
response = chat("你好!请介绍一下 LangGraph。")
print(response)

4.2 为聊天机器人添加工具

一个更实用的聊天机器人需要能够使用工具(如搜索、计算等)。让我们扩展上面的例子:

from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage

# 定义工具
@tool
def calculate(expression: str) -> str:
    """执行数学计算。输入一个数学表达式,返回计算结果。"""
    try:
        result = eval(expression)
        return f"计算结果: {result}"
    except Exception as e:
        return f"计算错误: {str(e)}"

@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气信息(模拟数据)。"""
    # 实际应用中这里会调用真实的天气 API
    weather_data = {
        "北京": "晴天,温度 25°C,湿度 40%",
        "上海": "多云,温度 28°C,湿度 65%",
        "广州": "小雨,温度 32°C,湿度 80%"
    }
    return weather_data.get(city, f"暂无 {city} 的天气数据")

# 定义状态
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]

# 初始化带工具的 LLM
tools = [calculate, get_weather]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_with_tools = llm.bind_tools(tools)

# 定义 Agent 节点
def agent_node(state: AgentState) -> dict:
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

# 构建图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))

# 设置入口
workflow.set_entry_point("agent")

# 添加条件边:根据 LLM 是否调用工具决定流转方向
workflow.add_conditional_edges(
    "agent",
    tools_condition,  # 内置的条件函数:有工具调用则去 tools,否则结束
)

# 工具执行完后返回 agent 继续处理
workflow.add_edge("tools", "agent")

# 编译
app = workflow.compile()

# 测试
messages = [HumanMessage(content="帮我计算 123 * 456 + 789,并告诉我北京今天的天气")]
result = app.invoke({"messages": messages})

for message in result["messages"]:
    print(f"[{type(message).__name__}]: {message.content}")

五、进阶应用:构建 ReAct Agent

ReAct(Reasoning and Acting)是目前最流行的 AI Agent 模式之一,它让 LLM 交替进行推理(Reasoning)和行动(Acting),从而解决复杂问题。

5.1 ReAct Agent 的工作原理

ReAct 的执行流程如下:

思考(Thought)→ 行动(Action)→ 观察(Observation)→ 思考 → ... → 最终答案
  1. 思考:LLM 分析当前问题,决定下一步该做什么

  2. 行动:执行具体操作(调用工具、查询数据库等)

  3. 观察:获取行动的结果

  4. 循环:将观察结果加入上下文,继续思考下一步

5.2 用 LangGraph 实现 ReAct Agent

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
import json

# ============ 工具定义 ============
@tool
def search_web(query: str) -> str:
    """搜索互联网获取信息。"""
    # 模拟搜索结果
    mock_results = {
        "LangGraph": "LangGraph 是 LangChain 团队开发的图结构 AI 工作流框架,支持状态管理和循环执行。",
        "Python": "Python 是一种高级编程语言,以简洁易读著称,广泛用于 AI、数据科学和 Web 开发。",
    }
    for key, value in mock_results.items():
        if key.lower() in query.lower():
            return value
    return f"未找到关于 '{query}' 的相关信息"

@tool
def read_file(filename: str) -> str:
    """读取指定文件的内容。"""
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            return f.read()
    except FileNotFoundError:
        return f"文件 '{filename}' 不存在"
    except Exception as e:
        return f"读取文件时发生错误: {str(e)}"

@tool  
def write_file(filename: str, content: str) -> str:
    """将内容写入指定文件。"""
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
        return f"成功将内容写入文件 '{filename}'"
    except Exception as e:
        return f"写入文件时发生错误: {str(e)}"

# ============ 状态定义 ============
class ReActState(TypedDict):
    messages: Annotated[list, add_messages]
    step_count: int
    max_steps: int

# ============ 节点定义 ============
tools = [search_web, read_file, write_file]
llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm_with_tools = llm.bind_tools(tools)

SYSTEM_PROMPT = """你是一个有用的 AI 助手,可以使用工具来完成任务。
请按照以下步骤工作:
1. 仔细分析用户的请求
2. 决定是否需要使用工具
3. 如果需要,调用适当的工具获取信息
4. 基于工具返回的结果,继续分析或提供最终答案
5. 确保你的回答准确、完整

可用工具:search_web(搜索信息)、read_file(读取文件)、write_file(写入文件)
"""

def agent_node(state: ReActState) -> dict:
    """Agent 推理节点"""
    messages = state["messages"]
    
    # 如果没有系统消息,添加系统提示
    if not messages or not isinstance(messages[0], SystemMessage):
        messages = [SystemMessage(content=SYSTEM_PROMPT)] + messages
    
    response = llm_with_tools.invoke(messages)
    
    return {
        "messages": [response],
        "step_count": state.get("step_count", 0) + 1
    }

def tool_node(state: ReActState) -> dict:
    """工具执行节点"""
    from langchain_core.messages import ToolMessage
    
    last_message = state["messages"][-1]
    tool_results = []
    
    # 创建工具映射
    tool_map = {t.name: t for t in tools}
    
    # 执行所有工具调用
    for tool_call in last_message.tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]
        
        if tool_name in tool_map:
            try:
                result = tool_map[tool_name].invoke(tool_args)
                tool_results.append(
                    ToolMessage(
                        content=str(result),
                        tool_call_id=tool_call["id"]
                    )
                )
            except Exception as e:
                tool_results.append(
                    ToolMessage(
                        content=f"工具执行错误: {str(e)}",
                        tool_call_id=tool_call["id"]
                    )
                )
    
    return {"messages": tool_results}

def should_continue(state: ReActState) -> Literal["tools", "end"]:
    """决策函数:判断是否继续执行工具"""
    last_message = state["messages"][-1]
    step_count = state.get("step_count", 0)
    max_steps = state.get("max_steps", 10)
    
    # 超过最大步数,强制结束
    if step_count >= max_steps:
        return "end"
    
    # 如果最后一条消息包含工具调用,继续执行
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    
    return "end"

# ============ 构建图 ============
workflow = StateGraph(ReActState)

workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"tools": "tools", "end": END}
)

workflow.add_edge("tools", "agent")

react_app = workflow.compile()

# ============ 运行测试 ============
initial_state = {
    "messages": [HumanMessage(content="请搜索关于 LangGraph 的信息,并将结果保存到 langgraph_info.txt 文件中")],
    "step_count": 0,
    "max_steps": 10
}

print("开始执行 ReAct Agent...\n")
for step in react_app.stream(initial_state):
    for node_name, node_output in step.items():
        print(f"【节点:{node_name}】")
        if "messages" in node_output:
            last_msg = node_output["messages"][-1]
            print(f"  类型: {type(last_msg).__name__}")
            if hasattr(last_msg, 'content') and last_msg.content:
                print(f"  内容: {last_msg.content[:200]}...")
        print()

六、人在回路(Human-in-the-Loop)

在某些关键场景中,我们希望 AI 在执行重要操作之前先获得人工确认。LangGraph 的检查点机制完美支持这一需求。

6.1 实现人工审批流程

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage

class ApprovalState(TypedDict):
    messages: Annotated[list, add_messages]
    pending_action: str
    approved: bool
    execution_result: str

def analyze_request(state: ApprovalState) -> dict:
    """分析请求,生成待执行的操作"""
    # 这里模拟 LLM 分析
    user_message = state["messages"][-1].content
    
    # 假设 LLM 决定需要删除某个文件
    pending_action = f"准备执行操作:删除文件 important_data.csv(基于用户请求:{user_message})"
    
    return {
        "messages": [AIMessage(content=f"我分析了您的请求,需要执行以下操作:\n{pending_action}\n\n请确认是否继续?")],
        "pending_action": pending_action,
        "approved": False
    }

def execute_action(state: ApprovalState) -> dict:
    """执行已批准的操作"""
    if state.get("approved", False):
        # 执行实际操作
        result = f"操作已成功执行:{state['pending_action']}"
    else:
        result = "操作已被取消"
    
    return {
        "messages": [AIMessage(content=result)],
        "execution_result": result
    }

def check_approval(state: ApprovalState) -> str:
    """检查是否已获得批准"""
    if state.get("approved", False):
        return "execute"
    else:
        return "wait"  # 等待人工审批

# 构建图
workflow = StateGraph(ApprovalState)
workflow.add_node("analyze", analyze_request)
workflow.add_node("execute", execute_action)

workflow.set_entry_point("analyze")
workflow.add_conditional_edges(
    "analyze",
    check_approval,
    {"execute": "execute", "wait": END}  # wait 时暂停,等待外部输入
)
workflow.add_edge("execute", END)

# 使用检查点编译
checkpointer = MemorySaver()
app = workflow.compile(
    checkpointer=checkpointer,
    interrupt_before=["execute"]  # 在执行节点前中断,等待人工确认
)

# 运行示例
config = {"configurable": {"thread_id": "approval_flow_001"}}

# 第一步:发起请求
print("=== 发起请求 ===")
result = app.invoke(
    {
        "messages": [HumanMessage(content="请清理过期的数据文件")],
        "approved": False,
        "pending_action": "",
        "execution_result": ""
    },
    config=config
)
print(f"AI 回复: {result['messages'][-1].content}\n")

# 获取当前状态(此时工作流已暂停)
current_state = app.get_state(config)
print(f"当前待执行操作: {current_state.values.get('pending_action', '无')}\n")

# 第二步:人工审批(更新状态)
print("=== 人工审批:批准操作 ===")
app.update_state(config, {"approved": True})

# 第三步:继续执行
print("=== 继续执行 ===")
final_result = app.invoke(None, config=config)
print(f"执行结果: {final_result['execution_result']}")

七、多代理系统(Multi-Agent System)

对于复杂任务,单个代理往往不够用。LangGraph 支持构建多个专业化代理协同工作的系统。

7.1 主管-工作者模式(Supervisor-Worker Pattern)

from typing import TypedDict, Annotated, List, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import tool

# ============ 专业工具 ============
@tool
def code_analysis(code: str) -> str:
    """分析代码质量和潜在问题"""
    # 模拟代码分析
    issues = []
    if "eval(" in code:
        issues.append("安全风险:使用了 eval() 函数")
    if len(code.split('\n')) > 100:
        issues.append("建议:函数过长,考虑拆分")
    if not issues:
        return "代码分析完成:未发现明显问题"
    return f"代码分析完成,发现以下问题:\n" + "\n".join(f"- {i}" for i in issues)

@tool
def generate_tests(function_name: str, description: str) -> str:
    """为指定函数生成单元测试"""
    return f"""
# 为 {function_name} 生成的单元测试
import pytest

class Test{function_name.capitalize()}:
    def test_basic_functionality(self):
        \"\"\"测试基本功能: {description}\"\"\"
        # TODO: 实现测试逻辑
        pass
    
    def test_edge_cases(self):
        \"\"\"测试边界情况\"\"\"
        # TODO: 实现边界测试
        pass
    
    def test_error_handling(self):
        \"\"\"测试错误处理\"\"\"
        # TODO: 实现错误测试
        pass
"""

@tool
def write_documentation(function_name: str, description: str, params: str) -> str:
    """生成函数文档"""
    return f"""
## {function_name}

**描述**: {description}

**参数**:
{params}

**返回值**: 根据函数逻辑返回相应结果

**使用示例**:
```python
result = {function_name}(...)

注意事项: 请确保输入参数符合预期格式 """

============ 状态定义 ============

class MultiAgentState(TypedDict): messages: Annotated[list, add_messages] task: str current_agent: str results: dict next_step: str

============ 代理定义 ============

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

主管代理

def supervisor_agent(state: MultiAgentState) -> dict: """主管代理:负责任务分解和调度""" supervisor_prompt = """你是一个项目主管,负责协调多个专业代理完成软件开发任务。 你需要决定将任务分配给哪个代理: - code_reviewer: 负责代码审查和质量分析 - test_writer: 负责编写单元测试 - doc_writer: 负责编写文档 - FINISH: 所有任务已完成

请根据当前进展,决定下一步应该由哪个代理处理,并说明原因。
只输出代理名称(code_reviewer/test_writer/doc_writer/FINISH)和简短说明。
"""

messages = [SystemMessage(content=supervisor_prompt)] + state["messages"]
response = llm.invoke(messages)

# 解析主管决策
content = response.content.lower()
if "code_reviewer" in content:
    next_agent = "code_reviewer"
elif "test_writer" in content:
    next_agent = "test_writer"
elif "doc_writer" in content:
    next_agent = "doc_writer"
else:
    next_agent = "FINISH"

return {
    "messages": [AIMessage(content=f"[主管] {response.content}")],
    "next_step": next_agent
}

代码审查代理

def code_reviewer_agent(state: MultiAgentState) -> dict: """代码审查代理""" # 模拟对任务描述中的代码进行审查 sample_code = """ def process_data(data): result = eval(data) # 这行代码有安全风险 return result """

review_result = code_analysis.invoke({"code": sample_code})

return {
    "messages": [AIMessage(content=f"[代码审查员] 完成代码审查:\n{review_result}")],
    "results": {**state.get("results", {}), "code_review": review_result},
    "current_agent": "code_reviewer"
}

测试编写代理

def test_writer_agent(state: MultiAgentState) -> dict: """测试编写代理""" tests = generate_tests.invoke({ "function_name": "process_data", "description": "处理输入数据并返回结果" })

return {
    "messages": [AIMessage(content=f"[测试工程师] 已生成单元测试:\n{tests}")],
    "results": {**state.get("results", {}), "tests": tests},
    "current_agent": "test_writer"
}

文档编写代理

def doc_writer_agent(state: MultiAgentState) -> dict: """文档编写代理""" docs = write_documentation.invoke({ "function_name": "process_data", "description": "处理输入数据并返回处理结果", "params": "- data (str): 待处理的数据字符串" })

return {
    "messages": [AIMessage(content=f"[文档工程师] 已生成文档:\n{docs}")],
    "results": {**state.get("results", {}), "documentation": docs},
    "current_agent": "doc_writer"
}

def route_to_agent(state: MultiAgentState) -> str: """根据主管决策路由到对应代理""" next_step = state.get("next_step", "FINISH")

routing_map = {
    "code_reviewer": "code_reviewer",
    "test_writer": "test_writer",
    "doc_writer": "doc_writer",
    "FINISH": END
}

return routing_map.get(next_step, END)

============ 构建多代理图 ============

workflow = StateGraph(MultiAgentState)

添加所有节点

workflow.add_node("supervisor", supervisor_agent) workflow.add_node("code_reviewer", code_reviewer_agent) workflow.add_node("test_writer", test_writer_agent) workflow.add_node("doc_writer", doc_writer_agent)

设置入口

workflow.set_entry_point("supervisor")

主管路由

workflow.add_conditional_edges( "supervisor", route_to_agent, { "code_reviewer": "code_reviewer", "test_writer": "test_writer", "doc_writer": "doc_writer", END: END } )

所有工作者完成后回到主管

for worker in ["code_reviewer", "test_writer", "doc_writer"]: workflow.add_edge(worker, "supervisor")

multi_agent_app = workflow.compile()

============ 运行测试 ============

initial_state = { "messages": [HumanMessage(content="请对 process_data 函数进行代码审查、编写测试用例并生成文档")], "task": "软件开发流程自动化", "current_agent": "", "results": {}, "next_step": "" }

print("启动多代理系统...\n") step_count = 0 for step in multi_agent_app.stream(initial_state): step_count += 1 if step_count > 10: # 防止无限循环 break for node, output in step.items(): if "messages" in output: last_msg = output["messages"][-1] print(f"📍 {node}: {last_msg.content[:300]}") print("-" * 50)


---

## 八、最佳实践与常见问题

### 8.1 状态设计原则

良好的状态设计是 LangGraph 应用成功的基础:

**原则一:最小化状态**
只在状态中存储真正需要跨节点共享的信息,避免状态过于臃肿。

**原则二:使用合适的数据类型**
- 列表类型配合 `Annotated` 和 `add_messages` 等归约函数
- 字典类型用于存储键值对数据
- 简单类型(str, int, bool)用于标志和计数器

**原则三:明确更新语义**
理解 LangGraph 的状态更新机制:默认情况下,返回字典中的值会**覆盖**原有值,而带有归约函数(如 `add_messages`)的字段会**追加或合并**。

### 8.2 错误处理策略

```python
def robust_node(state: AgentState) -> dict:
    """具备错误处理能力的节点"""
    try:
        result = risky_operation(state)
        return {
            "messages": [AIMessage(content=result)],
            "error": None,
            "retry_count": 0
        }
    except Exception as e:
        retry_count = state.get("retry_count", 0)
        error_msg = f"执行失败(第 {retry_count + 1} 次尝试): {str(e)}"
        
        return {
            "messages": [AIMessage(content=error_msg)],
            "error": str(e),
            "retry_count": retry_count + 1
        }

def handle_error(state: AgentState) -> str:
    """错误路由决策"""
    if state.get("error") and state.get("retry_count", 0) < 3:
        return "retry"
    elif state.get("error"):
        return "fallback"
    else:
        return "success"

8.3 调试技巧

# 技巧一:使用 stream 方法逐步观察执行过程
for step in app.stream(initial_state):
    print(f"当前步骤: {list(step.keys())}")
    for node, output in step.items():
        print(f"  节点 {node} 输出: {output}")

# 技巧二:可视化图结构
from IPython.display import Image
graph_image = app.get_graph().draw_mermaid_png()
with open("graph.png", "wb") as f:
    f.write(graph_image)

# 技巧三:打印 Mermaid 图表代码
print(app.get_graph().draw_mermaid())

# 技巧四:查看图的节点和边信息
graph = app.get_graph()
print("节点列表:", list(graph.nodes.keys()))
print("边列表:", [(e.source, e.target) for e in graph.edges])

8.4 性能优化建议

  1. 并行执行:对于相互独立的节点,使用 Send API 实现并行执行

  2. 缓存工具结果:对于相同输入的工具调用,使用缓存避免重复执行

  3. 合理设置最大步数:始终设置循环执行的上限,防止无限循环

  4. 选择合适的 LLM:对于简单决策使用较小的模型,复杂推理才使用大模型


九、实战项目:构建智能代码助手

让我们综合运用所学知识,构建一个完整的智能代码助手应用:

from typing import TypedDict, Annotated, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import tool
import ast
import subprocess
import sys

# ============ 工具定义 ============
@tool
def analyze_code_syntax(code: str) -> str:
    """检查 Python 代码的语法是否正确"""
    try:
        ast.parse(code)
        return "✅ 代码语法检查通过,没有语法错误"
    except SyntaxError as e:
        return f"❌ 语法错误:第 {e.lineno} 行 - {e.msg}"

@tool
def explain_error(error_message: str) -> str:
    """解释常见的 Python 错误信息"""
    error_explanations = {
        "NameError": "变量或函数名未定义,请检查变量名是否正确,或是否已经初始化",
        "TypeError": "类型错误,操作数的类型不匹配,请检查数据类型",
        "IndexError": "列表/数组索引超出范围,请检查索引值是否合法",
        "KeyError": "字典中不存在该键,请确认键名是否正确",
        "AttributeError": "对象没有该属性或方法,请检查属性名是否正确",
        "ImportError": "模块导入失败,请确认模块已安装或路径正确",
    }
    
    for error_type, explanation in error_explanations.items():
        if error_type in error_message:
            return f"错误类型:{error_type}\n解释:{explanation}"
    
    return f"无法识别的错误类型,请提供更多上下文信息"

@tool
def suggest_refactoring(code: str) -> str:
    """提供代码重构建议"""
    suggestions = []
    lines = code.split('\n')
    
    if len(lines) > 50:
        suggestions.append("📋 建议:函数超过 50 行,考虑拆分为多个小函数")
    
    if code.count('for') > 3:
        suggestions.append("📋 建议:嵌套循环较多,考虑使用列表推导式或 map/filter")
    
    if 'print(' in code and 'logging' not in code:
        suggestions.append("📋 建议:生产代码中使用 logging 模块替代 print 语句")
    
    if not suggestions:
        return "✅ 代码结构良好,暂无重构建议"
    
    return "\n".join(suggestions)

# ============ 状态定义 ============
class CodeAssistantState(TypedDict):
    messages: Annotated[list, add_messages]
    code_context: str
    current_issue: str
    analysis_result: str
    solution_provided: bool
    session_id: str

# ============ 节点定义 ============
tools = [analyze_code_syntax, explain_error, suggest_refactoring]
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
llm_with_tools = llm.bind_tools(tools)

ASSISTANT_SYSTEM_PROMPT = """你是一个专业的 Python 代码助手,擅长:
1. 分析和调试代码问题
2. 解释错误信息并提供解决方案
3. 提供代码优化和重构建议
4. 用清晰易懂的方式解释复杂概念

工作方式:
- 首先理解用户的问题
- 使用合适的工具分析代码
- 提供详细的解释和解决方案
- 给出可以直接使用的代码示例

请始终用中文回复,保持专业但友好的语气。
"""

def code_assistant_node(state: CodeAssistantState) -> dict:
    """代码助手主节点"""
    messages = [SystemMessage(content=ASSISTANT_SYSTEM_PROMPT)] + state["messages"]
    response = llm_with_tools.invoke(messages)
    
    return {"messages": [response]}

def tools_executor_node(state: CodeAssistantState) -> dict:
    """工具执行节点"""
    from langchain_core.messages import ToolMessage
    
    last_message = state["messages"][-1]
    tool_map = {t.name: t for t in tools}
    results = []
    
    for tool_call in last_message.tool_calls:
        tool_fn = tool_map.get(tool_call["name"])
        if tool_fn:
            try:
                result = tool_fn.invoke(tool_call["args"])
                results.append(ToolMessage(
                    content=str(result),
                    tool_call_id=tool_call["id"]
                ))
            except Exception as e:
                results.append(ToolMessage(
                    content=f"工具执行出错: {e}",
                    tool_call_id=tool_call["id"]
                ))
    
    return {"messages": results}

def should_use_tools(state: CodeAssistantState) -> str:
    last_msg = state["messages"][-1]
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
        return "use_tools"
    return "respond"

# ============ 构建图 ============
workflow = StateGraph(CodeAssistantState)

workflow.add_node("assistant", code_assistant_node)
workflow.add_node("tools", tools_executor_node)

workflow.set_entry_point("assistant")

workflow.add_conditional_edges(
    "assistant",
    should_use_tools,
    {"use_tools": "tools", "respond": END}
)
workflow.add_edge("tools", "assistant")

checkpointer = MemorySaver()
code_assistant = workflow.compile(checkpointer=checkpointer)

# ============ 交互式运行 ============
def run_code_assistant():
    """运行交互式代码助手"""
    session_id = "demo_session_001"
    config = {"configurable": {"thread_id": session_id}}
    
    print("=" * 60)
    print("🤖 智能代码助手已启动")
    print("   输入 'quit' 退出,输入 'clear' 清空对话历史")
    print("=" * 60)
    
    while True:
        user_input = input("\n👤 您: ").strip()
        
        if user_input.lower() == 'quit':
            print("👋 再见!")
            break
        
        if not user_input:
            continue
        
        initial_state = {
            "messages": [HumanMessage(content=user_input)],
            "code_context": "",
            "current_issue": "",
            "analysis_result": "",
            "solution_provided": False,
            "session_id": session_id
        }
        
        print("\n🤖 助手: ", end="", flush=True)
        
        result = code_assistant.invoke(initial_state, config=config)
        
        last_message = result["messages"][-1]
        print(last_message.content)

# 运行助手
# run_code_assistant()  # 取消注释以运行交互模式

# 单次测试
test_state = {
    "messages": [HumanMessage(content="""
我的代码出现了这个错误:

def calculate_average(numbers): total = sum(numbers) return total / len(numbers)

result = calculate_average([]) print(result)

错误信息:ZeroDivisionError: division by zero
请帮我分析并修复这个问题。
""")],
    "code_context": "",
    "current_issue": "",
    "analysis_result": "",
    "solution_provided": False,
    "session_id": "test_001"
}

config = {"configurable": {"thread_id": "test_001"}}
result = code_assistant.invoke(test_state, config=config)
print("助手回复:", result["messages"][-1].content)

十、总结与展望

10.1 本文知识点回顾

通过本文的学习,我们系统地掌握了 LangGraph 的核心内容:

主题

关键知识点

基础概念

状态、节点、边、图的定义与作用

快速入门

简单聊天机器人、工具调用集成

进阶模式

ReAct Agent 的设计与实现

人机协作

检查点机制、中断和恢复、人工审批

多代理系统

主管-工作者模式、代理间协作

最佳实践

错误处理、调试技巧、性能优化

实战项目

完整的智能代码助手应用

10.2 LangGraph 的适用场景

LangGraph 特别适合以下场景:

  • 复杂多步骤 AI 工作流:需要根据中间结果动态调整执行路径

  • 长期运行的 AI 任务:需要状态持久化和断点续传能力

  • 需要人工干预的流程:关键决策点需要人工审核和确认

  • 多专业领域协作:不同领域的专业代理共同完成复杂任务

  • 企业级 AI 应用:需要可观测性、可控性和可审计性

10.3 下一步学习建议

掌握了基础之后,建议进一步探索以下进阶主题:

  1. LangGraph Cloud:学习如何将 LangGraph 应用部署到云端

  2. LangSmith 集成:使用 LangSmith 进行工作流追踪、调试和评估

  3. 持久化存储:探索 PostgreSQL、Redis 等生产级检查点方案

  4. 流式输出:实现更好的用户体验,支持实时流式响应

  5. 自定义归约函数:设计适合业务需求的状态更新策略

  6. 子图(SubGraph):将复杂流程模块化,构建可复用的子工作流

10.4 结语

LangGraph 代表了 AI 应用开发的一个重要方向——从简单的"问答"走向复杂的"自主推理与行动"。它将图论的严谨性与 LLM 的智能性完美结合,为开发者提供了一个强大而灵活的工具箱。

随着 AI 技术的持续演进,LangGraph 这类编排框架将在企业 AI 应用中扮演越来越重要的角色。掌握它,不仅是技术能力的提升,更是对未来 AI 应用开发范式的深刻理解。

希望本文能够帮助你快速入门 LangGraph,并在实际项目中发挥它的强大威力。如果你在学习过程中有任何问题,欢迎参考 LangGraph 官方文档 或社区讨论。


本文所有代码示例均基于 LangGraph 最新稳定版本编写,如遇到版本兼容性问题,请以官方文档为准。