轨道交通 AI Agent 智能运营平台 技术方案
一、整体架构全景
┌──────────────────────────────────────────────────────────────────────┐
│ 业务应用层 │
│ ┌──────────────────┐ ┌──────────────────┐ ┌───────────────────┐ │
│ │ 应急预案自动匹配 │ │ 综合告警智能核查 │ │ 规章制度智能问答 │ │
│ └────────┬─────────┘ └────────┬─────────┘ └────────┬──────────┘ │
└───────────┼─────────────────────┼─────────────────────┼─────────────┘
│ │ │
┌───────────▼─────────────────────▼─────────────────────▼─────────────┐
│ AI Agent 编排层(LangGraph) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────────────┐ │
│ │ 应急预案 Agent │ │ 告警核查 Agent │ │ 自动化工作流 Agent │ │
│ │ 多步推理 + 循环 │ │ 多源融合 + 研判 │ │ 端到端业务流程编排 │ │
│ └─────────────────┘ └─────────────────┘ └──────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
│ │ │
┌───────────▼─────────────────────▼─────────────────────▼─────────────┐
│ 能力中台层 │
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ ┌──────────┐ │
│ │ RAG 检索引擎 │ │ LLM 统一网关 │ │ Tool 工具集 │ │ Memory │ │
│ │ 混合检索召回 │ │ 模型统一调度 │ │ 函数/API调用 │ │ 状态持久 │ │
│ └──────────────┘ └──────────────┘ └─────────────┘ └──────────┘ │
└──────────────────────────────────────────────────────────────────────┘
│
┌───────────▼──────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ ┌───────────────┐ ┌──────────────┐ ┌──────────┐ ┌────────────┐ │
│ │ Milvus 向量库 │ │ PostgreSQL │ │ Redis │ │ MinIO/OSS │ │
│ │ 语义检索核心 │ │ 结构化数据 │ │ 热点缓存 │ │ 原始文档 │ │
│ └───────────────┘ └──────────────┘ └──────────┘ └────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
二、AI Agent 核心系统设计
2.1 Agent 整体设计原则
设计原则
├── 可观测 每个节点输入输出全链路可追踪(LangSmith)
├── 可恢复 Checkpoint 持久化,故障后从断点继续执行
├── 可介入 关键决策节点支持 Human-in-the-Loop 人工审核
├── 可解释 每步推理过程强制输出依据,结果可溯源
└── 可扩展 节点解耦,新业务场景复用已有节点能力
2.2 应急预案自动匹配 Agent
业务流程设计
告警事件输入
│
▼
┌─────────────┐ 信息不足
│ 事件解析节点 │ ──────────────▶ 追问澄清 ──┐
│ 类型/等级/位置│ │
└──────┬──────┘ ◀───────────────────────────┘
│ 信息完整
▼
┌─────────────┐
│ 意图分类节点 │ 设备故障 / 人员事件 / 自然灾害 / 运营中断
└──────┬──────┘
│
▼
┌─────────────┐
│ 预案检索节点 │ Milvus 语义检索 + 类型过滤
└──────┬──────┘
│
▼
┌─────────────┐
│ 预案排序节点 │ LLM 重排序 + 历史命中率加权
└──────┬──────┘
│
▼
┌─────────────┐
│ 处置建议生成 │ 结构化输出处置步骤 + 责任人 + 时限
└──────┬──────┘
│
▼
┌─────────────┐ 拒绝/修改
│ Human Review │ ──────────────▶ 返回排序节点重新生成
│ 人工审核点 │
└──────┬──────┘ 确认
│
▼
结果输出 + 存档
核心代码实现
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, List, Optional
# ── 1. 定义 Agent 共享状态 Schema ─────────────────────────────────────
class EmergencyAgentState(TypedDict):
alarm_raw: str # 原始告警文本
event_type: Optional[str] # 解析后事件类型
event_level: Optional[str] # 告警等级 红/橙/黄/蓝
event_location: Optional[str] # 事发位置
retrieved_plans: List[dict] # 检索到的预案列表
ranked_plans: List[dict] # 重排后的预案
suggestion: Optional[str] # 生成的处置建议
human_feedback: Optional[str] # 人工反馈意见
is_approved: bool # 人工审核结果
retry_count: int # 重试次数(防无限循环)
trace_log: List[str] # 每步执行日志(可解释性)
# ── 2. 定义各功能节点 ─────────────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def event_parser_node(state: EmergencyAgentState) -> dict:
"""事件解析:提取类型、等级、位置"""
prompt = ChatPromptTemplate.from_template("""
你是轨道交通应急处置专家,请解析以下告警事件:
{alarm_raw}
请以 JSON 格式输出:
{{"event_type": "...", "event_level": "红/橙/黄/蓝", "event_location": "..."}}
""")
result = (prompt | llm).invoke({"alarm_raw": state["alarm_raw"]})
parsed = parse_json(result.content)
return {
"event_type": parsed.get("event_type"),
"event_level": parsed.get("event_level"),
"event_location": parsed.get("event_location"),
"trace_log": state["trace_log"] + [f"[解析] 类型={parsed.get('event_type')}"]
}
def plan_retrieval_node(state: EmergencyAgentState) -> dict:
"""从 Milvus 检索相关预案"""
plans = milvus_retriever.search(
query=f"{state['event_type']} {state['alarm_raw']}",
filters={
"doc_type": "emergency_plan",
"event_type": state["event_type"],
"is_active": True
},
top_k=10
)
return {
"retrieved_plans": plans,
"trace_log": state["trace_log"] + [f"[检索] 召回 {len(plans)} 条预案"]
}
def plan_ranking_node(state: EmergencyAgentState) -> dict:
"""LLM 重排序 + 历史命中率加权"""
reranked = reranker.rerank(
query=state["alarm_raw"],
documents=state["retrieved_plans"],
top_k=3
)
return {
"ranked_plans": reranked,
"trace_log": state["trace_log"] + [f"[排序] Top1={reranked[0]['title']}"]
}
def suggestion_generator_node(state: EmergencyAgentState) -> dict:
"""生成结构化处置建议"""
context = "\n\n".join([p["content"] for p in state["ranked_plans"]])
prompt = ChatPromptTemplate.from_template("""
基于以下应急预案内容,针对当前事件生成处置建议:
事件:{alarm} 等级:{level}
参考预案:
{context}
请输出:1.立即处置步骤 2.责任人 3.处置时限 4.注意事项
""")
result = (prompt | llm).invoke({
"alarm": state["alarm_raw"],
"level": state["event_level"],
"context": context
})
return {"suggestion": result.content}
def human_review_node(state: EmergencyAgentState) -> dict:
"""人工审核占位节点(interrupt_before 在此暂停)"""
return state # 等待外部注入 human_feedback 和 is_approved
# ── 3. 条件路由函数 ───────────────────────────────────────────────────
def check_info_completeness(state: EmergencyAgentState) -> str:
"""判断事件信息是否完整"""
if not state.get("event_type") or not state.get("event_level"):
return "clarify" # 信息不足,追问
return "retrieve" # 信息完整,检索
def check_human_decision(state: EmergencyAgentState) -> str:
"""根据人工反馈决定下一步"""
if state.get("is_approved"):
return "output" # 审核通过,输出
if state.get("retry_count", 0) >= 3:
return "output" # 超过重试次数,强制输出
return "re_rank" # 拒绝,重新排序
# ── 4. 构建 LangGraph 图 ─────────────────────────────────────────────
graph = StateGraph(EmergencyAgentState)
graph.add_node("parser", event_parser_node)
graph.add_node("retrieval", plan_retrieval_node)
graph.add_node("ranking", plan_ranking_node)
graph.add_node("suggestion", suggestion_generator_node)
graph.add_node("human_review", human_review_node)
graph.set_entry_point("parser")
graph.add_conditional_edges("parser", check_info_completeness, {
"clarify": "parser", # 循环追问
"retrieve": "retrieval"
})
graph.add_edge("retrieval", "ranking")
graph.add_edge("ranking", "suggestion")
graph.add_edge("suggestion", "human_review")
graph.add_conditional_edges("human_review", check_human_decision, {
"re_rank": "ranking", # 重新排序
"output": END
})
# ── 5. 编译 + 持久化 Checkpoint ───────────────────────────────────────
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
emergency_agent = graph.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"] # 到审核节点前暂停,等人工操作
)
2.3 综合告警智能核查 Agent
核查流程设计
┌───────────────────────────────────────────────────────────┐
│ 多源数据采集 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 设备状态 │ │ 历史告警 │ │ 实时传感器│ │ 视频流 │ │
│ │ API │ │ 数据库 │ │ 数据 │ │ 分析结果 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼─────────────┼─────────┘
│ │ │ │
└─────────────┴──────┬──────┴─────────────┘
│ 并行采集(异步 IO)
▼
┌────────────────┐
│ 数据融合节点 │ 结构化告警上下文
└───────┬────────┘
│
┌────────────┴────────────┐
▼ ▼
┌───────────────┐ ┌─────────────────┐
│ 有效性判断 │ │ 关联性分析 │
│ 误报 / 真实 │ │ 同区段聚合 │
└───────┬───────┘ │ 因果链推断 │
│ └────────┬────────┘
└──────────┬─────────────┘
│
┌────────▼────────┐
│ 等级研判节点 │ 红/橙/黄/蓝 四级定级
└────────┬────────┘
│
┌────────▼────────┐
│ 处置优先级排序 │ 影响范围 × 紧急程度
└────────┬────────┘
│
┌────────▼────────┐
│ 核查报告生成节点 │ 结构化输出 + 推送责任人
└─────────────────┘
关键能力实现
from langchain_core.tools import tool
import asyncio
# 异步并行工具调用:同时获取多源数据
@tool
async def get_device_status(device_id: str) -> dict:
"""获取设备实时状态"""
return await device_api.get_status(device_id)
@tool
async def get_historical_alarms(location: str, hours: int = 24) -> list:
"""获取历史告警记录"""
return await alarm_db.query(location=location, hours=hours)
@tool
async def get_sensor_data(sensor_ids: list) -> dict:
"""获取传感器实时数据"""
return await sensor_api.batch_get(sensor_ids)
async def data_fusion_node(state):
"""并行采集多源数据,降低整体延迟"""
results = await asyncio.gather(
get_device_status.ainvoke(state["device_id"]),
get_historical_alarms.ainvoke(state["location"]),
get_sensor_data.ainvoke(state["sensor_ids"]),
return_exceptions=True # 某路失败不影响其他路
)
return {"fused_context": merge_results(results)}
三、RAG 知识库检索系统
3.1 RAG 全链路设计
━━━━━━━━━━━━━━━━━━━ 离线索引构建 ━━━━━━━━━━━━━━━━━━━
原始文档(规章/预案/手册)
│
▼
① 文档解析 MinerU + DocLayout 版面分析
│ ├─ 正文段落提取
│ ├─ 表格结构化解析
│ └─ 图片 OCR 识别
▼
② 智能切块 按语义边界切分(非固定长度)
│ ├─ 规章制度:按条款号切分
│ ├─ 应急预案:按处置步骤切分
│ └─ 通用内容:语义段落切分
▼
③ 向量化 BGE-M3 双路编码
│ ├─ 稠密向量(dim=1024)语义理解
│ └─ 稀疏向量(BM25权重)关键词匹配
▼
④ 写入 Milvus 稠密 + 稀疏向量 + Metadata 一并写入
━━━━━━━━━━━━━━━━━━━ 在线检索召回 ━━━━━━━━━━━━━━━━━━━
用户查询
│
▼
① 查询改写 HyDE 假设文档扩展 + 多路查询生成
│
▼
② 混合检索 稠密检索(ANN余弦)+ 稀疏检索(BM25)
│ └─ RRF 倒排融合排序
▼
③ 粗排过滤 Metadata 过滤(文档类型/有效期/版本)
│
▼
④ 精排重排 BGE-Reranker-v2 交叉编码精排
│
▼
⑤ 上下文组装 召回片段 + 来源编号 + 置信度 → LLM 生成答案
3.2 智能切块策略
import re
from dataclasses import dataclass
from langchain_core.documents import Document
@dataclass
class ChunkConfig:
max_size: int = 512
overlap: int = 64
min_size: int = 100
class RailTransitSmartChunker:
"""轨道交通规章制度专用语义切块器"""
def chunk(self, document: Document) -> list[Document]:
doc_type = self._detect_doc_type(document)
strategy_map = {
"regulation": self._split_by_article, # 规章:按条款
"emergency_plan": self._split_by_procedure, # 预案:按步骤
"table": self._keep_table_intact, # 表格:整表保留
"general": self._semantic_split, # 通用:语义段落
}
chunks = strategy_map.get(doc_type, self._semantic_split)(document)
# 为每个 Chunk 注入 Metadata
return [self._enrich_metadata(chunk, document, i)
for i, chunk in enumerate(chunks)]
def _split_by_article(self, doc: Document) -> list[Document]:
"""按"第X条"切分,保留父级章节作为上下文"""
pattern = r"(第[一二三四五六七八九十百\d]+条[^\n]*)"
parts = re.split(pattern, doc.page_content)
chunks = []
for i in range(1, len(parts), 2):
title = parts[i].strip()
content = parts[i + 1].strip() if i + 1 < len(parts) else ""
if len(content) >= ChunkConfig.min_size:
chunks.append(Document(
page_content=f"{title}\n{content}",
metadata={"chunk_type": "article", "article_title": title}
))
return chunks
def _enrich_metadata(self, chunk: Document,
source: Document, idx: int) -> Document:
"""为 Chunk 注入完整 Metadata,支持 Milvus 过滤"""
chunk.metadata.update({
"doc_id": source.metadata.get("doc_id"),
"doc_type": source.metadata.get("doc_type"),
"category": source.metadata.get("category"), # 信号/供电/车辆
"version": source.metadata.get("version"),
"effective_date": source.metadata.get("effective_date"),
"is_active": source.metadata.get("is_active", True),
"chunk_index": idx,
"char_count": len(chunk.page_content),
})
return chunk
3.3 混合检索实现
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
class HybridRetriever(BaseRetriever):
"""稠密 + 稀疏混合检索,RRF 融合排序"""
def _get_relevant_documents(self, query: str) -> list[Document]:
# ① 查询改写:HyDE 生成假设文档扩展语义
hyde_doc = self._hyde_expansion(query)
sub_queries = self._generate_sub_queries(query) # 多路子查询
# ② 稠密检索(语义向量 ANN)
dense_hits = self.milvus.search(
vectors=[self.embedder.embed(hyde_doc)],
anns_field="dense_vec",
param={"metric_type": "COSINE", "params": {"ef": 256}},
filter=f"is_active == true",
limit=20
)
# ③ 稀疏检索(BM25 关键词)
sparse_hits = self.milvus.search(
vectors=[self.bm25.encode(query)],
anns_field="sparse_vec",
param={"metric_type": "IP"},
filter=f"is_active == true",
limit=20
)
# ④ RRF 倒排融合
fused = self._reciprocal_rank_fusion(
[dense_hits, sparse_hits], k=60
)
# ⑤ BGE-Reranker 精排(Top 20 → Top 5)
reranked = self.reranker.rerank(
query=query,
documents=fused[:20],
top_k=5
)
return reranked
def _reciprocal_rank_fusion(self, lists: list, k: int = 60) -> list:
scores = {}
for result_list in lists:
for rank, hit in enumerate(result_list):
doc_id = hit.id
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
sorted_ids = sorted(scores, key=scores.get, reverse=True)
return [self._id_to_doc(doc_id) for doc_id in sorted_ids]
def _hyde_expansion(self, query: str) -> str:
"""HyDE:让 LLM 生成假设答案文档,增强语义检索"""
prompt = f"请生成一段回答以下问题的文档片段:\n{query}"
return self.llm.invoke(prompt).content
四、Milvus 向量数据库架构方案
4.1 集群部署架构
外部请求
│
负载均衡(Nginx)
│
┌───────────┴───────────┐
▼ ▼
Milvus Proxy Milvus Proxy ← 无状态,水平扩展
│
┌─────────┴──────────┐
▼ ▼
Root Coord Query Coord ← 元数据 + 查询协调
│ │
▼ ▼
Data Node × 3 Query Node × 4 ← 写入 / 检索节点
│
Index Node × 2 ← 异步构建索引
┌───────────────────────────────┐
│ 持久化存储层 │
│ MinIO etcd Pulsar/Kafka │
│ 原始数据 元数据 消息队列 │
└───────────────────────────────┘
4.2 Collection Schema 设计
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection
def create_rail_knowledge_collection() -> Collection:
fields = [
# 主键与文本
FieldSchema("id", DataType.INT64, is_primary=True, auto_id=True),
FieldSchema("doc_id", DataType.VARCHAR, max_length=64),
FieldSchema("chunk_text", DataType.VARCHAR, max_length=4096),
# 向量字段
FieldSchema("dense_vec", DataType.FLOAT_VECTOR, dim=1024), # BGE-M3 稠密
FieldSchema("sparse_vec", DataType.SPARSE_FLOAT_VECTOR), # BM25 稀疏
# Metadata 过滤字段
FieldSchema("doc_type", DataType.VARCHAR, max_length=32),
FieldSchema("category", DataType.VARCHAR, max_length=64),
FieldSchema("version", DataType.VARCHAR, max_length=16),
FieldSchema("effective_date", DataType.INT64), # 时间戳
FieldSchema("expire_date", DataType.INT64), # 失效时间戳
FieldSchema("is_active", DataType.BOOL),
]
schema = CollectionSchema(fields, description="轨交知识库")
col = Collection(
name="rail_transit_knowledge",
schema=schema,
partition_key_field="doc_type" # 按文档类型分区,缩小检索范围
)
# ── 稠密向量索引:HNSW(高召回 + 低延迟)───────────────────────────
col.create_index("dense_vec", {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 256}
})
# ── 稀疏向量索引:倒排索引 ────────────────────────────────────────
col.create_index("sparse_vec", {
"index_type": "SPARSE_INVERTED_INDEX",
"metric_type": "IP",
"params": {"drop_ratio_build": 0.2}
})
col.load()
return col
4.3 索引性能调优策略
4.4 数据生命周期管理
class MilvusDataManager:
"""向量数据生命周期管理:版本更新 / 失效清理"""
def update_document_version(self, doc_id: str, new_content: str,
new_version: str):
"""文档版本更新:逻辑失效旧版本 + 写入新版本"""
# Step 1:旧版本逻辑失效(不物理删除,保留历史)
self.collection.delete(
expr=f'doc_id == "{doc_id}" and version != "{new_version}"'
)
# Step 2:写入新版本 Chunks
new_chunks = self.chunker.chunk(new_content)
self._batch_insert(new_chunks, doc_id, new_version)
def cleanup_expired_docs(self):
"""定时清理过期文档(每日凌晨执行)"""
today_ts = int(datetime.now().timestamp())
self.collection.delete(
expr=f"expire_date > 0 and expire_date < {today_ts}"
)
def _batch_insert(self, chunks: list, doc_id: str,
version: str, batch_size: int = 2000):
"""批量写入,避免频繁小批次"""
for i in range(0, len(chunks), batch_size):
batch = chunks[i: i + batch_size]
embeddings = self.embedder.batch_embed([c.text for c in batch])
self.collection.insert([build_entities(batch, embeddings)])
五、LangChain 框架深度应用
5.1 LangChain 核心模块使用全景
LangChain 在本项目中的使用分工
│
├── langchain-core 基础抽象层
│ ├── BaseRetriever 自定义混合检索器基类
│ ├── BaseTool 自定义业务工具基类
│ ├── BaseMemory 对话历史管理基类
│ └── RunnableSequence LCEL 链式组合
│
├── langchain-openai LLM 接入
│ ├── ChatOpenAI GPT-4o 调用
│ └── OpenAIEmbeddings Embedding 备选
│
├── langchain-community 生态集成
│ ├── MilvusRetriever Milvus 向量检索集成
│ └── RedisCache LLM 调用结果缓存
│
└── langgraph Agent 编排(重点)
├── StateGraph 有状态图定义
├── Checkpoint 状态持久化
└── interrupt_before Human-in-the-Loop
5.2 LCEL(LangChain 表达式语言)构建 RAG Chain
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# ── RAG Prompt 模板 ────────────────────────────────────────────────────
RAG_PROMPT = ChatPromptTemplate.from_template("""
你是轨道交通运营专家,请根据以下参考内容回答问题。
如果参考内容中没有相关信息,请明确说明,不要编造。
参考内容:
{context}
问题:{question}
请给出准确、专业的回答,并标注参考来源:
""")
# ── 构建 RAG Chain(LCEL 链式组合)──────────────────────────────────────
def format_docs(docs: list) -> str:
"""将检索结果格式化为带来源的上下文"""
return "\n\n".join([
f"[来源{i+1}] {doc.metadata.get('doc_type','')}-"
f"{doc.metadata.get('article_title','')}\n{doc.page_content}"
for i, doc in enumerate(docs)
])
rag_chain = (
{
"context": hybrid_retriever | format_docs, # 并行:检索 + 格式化
"question": RunnablePassthrough() # 原始问题直传
}
| RAG_PROMPT # 注入 Prompt
| ChatOpenAI(model="gpt-4o", temperature=0) # LLM 生成
| StrOutputParser() # 解析输出
)
# ── 流式输出(提升用户体验)──────────────────────────────────────────────
async def stream_answer(question: str):
async for chunk in rag_chain.astream(question):
yield chunk # Server-Sent Events 推送到前端
5.3 自定义 Tool 工具集
from langchain_core.tools import tool, StructuredTool
from pydantic import BaseModel, Field
# ── 工具输入 Schema(参数类型安全)──────────────────────────────────────
class AlarmQueryInput(BaseModel):
location: str = Field(description="告警发生的位置,如:1号线-人民广场站")
alarm_type: str = Field(description="告警类型:设备故障/人员事件/运营中断")
hours: int = Field(default=24, description="查询最近N小时的历史告警")
class PlanSearchInput(BaseModel):
event_description: str = Field(description="事件描述文本")
event_level: str = Field(description="事件等级:红/橙/黄/蓝")
top_k: int = Field(default=3, description="返回预案数量")
# ── 业务工具定义 ─────────────────────────────────────────────────────────
@tool(args_schema=AlarmQueryInput)
def query_historical_alarms(location: str, alarm_type: str,
hours: int = 24) -> list:
"""查询指定位置的历史告警记录,用于告警关联分析"""
return alarm_database.query(location=location,
alarm_type=alarm_type, hours=hours)
@tool(args_schema=PlanSearchInput)
def search_emergency_plans(event_description: str, event_level: str,
top_k: int = 3) -> list:
"""从应急预案知识库检索匹配预案"""
return milvus_retriever.search(
query=event_description,
filters={"doc_type": "emergency_plan"},
top_k=top_k
)
@tool
def get_realtime_device_status(device_ids: list[str]) -> dict:
"""获取设备实时运行状态"""
return device_api.batch_get_status(device_ids)
@tool
def send_emergency_notification(recipients: list[str],
content: str, level: str) -> bool:
"""发送应急通知给责任人"""
return notification_service.send(recipients, content, level)
# 工具注册
TOOL_REGISTRY = [
query_historical_alarms,
search_emergency_plans,
get_realtime_device_status,
send_emergency_notification,
]
5.4 LLM 调用优化
from langchain_community.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
import langchain
# ── 语义缓存:相似问题命中缓存,降低 LLM 调用成本 ─────────────────────
langchain.llm_cache = RedisSemanticCache(
redis_url=REDIS_URL,
embedding=OpenAIEmbeddings(),
score_threshold=0.95 # 相似度 > 0.95 直接命中缓存
)
# ── Fallback:主模型失败自动切换备用模型 ────────────────────────────────
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
primary_llm = ChatOpenAI(model="gpt-4o", temperature=0)
fallback_llm = ChatAnthropic(model="claude-3-5-sonnet")
llm_with_fallback = primary_llm.with_fallbacks(
[fallback_llm],
exceptions_to_handle=(Exception,)
)
# ── 结构化输出:强制 JSON 格式,避免解析失败 ────────────────────────────
from pydantic import BaseModel
class AlarmAnalysisResult(BaseModel):
is_valid: bool # 是否有效告警
alarm_level: str # 研判等级
root_cause: str # 根本原因
action_items: list[str] # 处置建议
confidence: float # 置信度 0~1
structured_llm = llm_with_fallback.with_structured_output(AlarmAnalysisResult)
六、自动化工作流设计与编排
6.1 工作流分层架构
自动化工作流体系
│
├── L1 原子工作流 单一功能节点,不可再拆分
│ ├── 告警接收 & 解析
│ ├── 预案向量检索
│ ├── LLM 推理调用
│ ├── 通知消息发送
│ └── 结果存档写库
│
├── L2 组合工作流 多个原子流按业务逻辑编排
│ ├── 告警 → 核查 → 预案匹配 → 通知
│ ├── 设备故障 → 影响评估 → 调度优化
│ └── 规章查询 → 语义检索 → 答案生成
│
└── L3 业务场景工作流 端到端完整业务,含人工介入点
├── 应急预案全流程自动处置
├── 综合告警全流程核查研判
└── 规章合规全流程检查
6.2 工作流可复用基类
from abc import ABC, abstractmethod
from uuid import uuid4
from langgraph.graph import CompiledGraph
class BaseAgentWorkflow(ABC):
"""所有工作流的可复用基类"""
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.graph: CompiledGraph = self._build_graph()
@abstractmethod
def _build_graph(self) -> CompiledGraph:
"""子类实现具体节点和边的构建"""
pass
def run(self, input_data: dict, thread_id: str = None) -> dict:
"""同步执行工作流"""
config = {"configurable": {
"thread_id": thread_id or uuid4().hex
}}
return self.graph.invoke(input_data, config=config)
def resume(self, thread_id: str, human_input: dict) -> dict:
"""人工审核后继续执行"""
config = {"configurable": {"thread_id": thread_id}}
return self.graph.invoke(human_input, config=config)
def get_state(self, thread_id: str) -> dict:
"""查询当前工作流状态(用于前端展示进度)"""
config = {"configurable": {"thread_id": thread_id}}
return self.graph.get_state(config)
async def arun(self, input_data: dict, thread_id: str = None):
"""异步执行(高并发场景)"""
config = {"configurable": {
"thread_id": thread_id or uuid4().hex
}}
async for event in self.graph.astream_events(input_data, config=config):
yield event # 流式推送执行进度
七、性能测试与持续优化
7.1 性能测试体系
测试体系
│
├── 质量评测(RAGAS 框架)
│ ├── Faithfulness 答案忠实度(不产生幻觉)
│ ├── Answer Relevancy 答案相关性
│ ├── Context Precision 检索上下文精确率
│ ├── Context Recall 检索上下文召回率
│ └── 业务指标 预案命中率 / 告警核查准确率
│
├── 性能测试(Locust)
│ ├── 接口延迟 P50 / P95 / P99
│ ├── 并发吞吐 QPS 压测
│ └── Milvus 检索性能 百万级向量 < 50ms
│
└── 稳定性测试
├── 长时运行 7×24h 连续压测
├── 故障注入 节点宕机 / 网络分区
└── 恢复验证 Checkpoint 恢复正确性
7.2 RAGAS 自动化评测
from ragas import evaluate
from ragas.metrics import (
faithfulness, # 忠实度:答案是否基于检索内容
answer_relevancy, # 相关性:答案是否回答了问题
context_precision, # 上下文精确率:检索内容是否都有用
context_recall, # 上下文召回率:是否检索到所有必要内容
)
from datasets import Dataset
def run_ragas_evaluation(test_cases: list[dict]) -> dict:
"""
test_cases 格式:[{
"question": "...",
"answer": "...", ← RAG 系统生成的答案
"contexts": ["..."], ← 检索到的上下文列表
"ground_truth": "..." ← 人工标注的标准答案
}]
"""
dataset = Dataset.from_list(test_cases)
results = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevancy,
context_precision, context_recall],
llm=eval_llm,
embeddings=eval_embeddings,
raise_exceptions=False
)
# 结果写入监控平台,触发告警
metrics_monitor.report({
"faithfulness": results["faithfulness"],
"answer_relevancy": results["answer_relevancy"],
"context_precision": results["context_precision"],
"context_recall": results["context_recall"],
"eval_version": current_version,
"eval_time": datetime.now().isoformat(),
})
return results
# CI/CD 集成:每次 Prompt / 模型变更后自动触发评测
# 任意指标低于阈值则阻断发布
7.3 全链路追踪(LangSmith)
from langsmith import Client
from langchain_core.tracers import LangChainTracer
# 配置 LangSmith 全链路追踪
tracer = LangChainTracer(project_name="rail-transit-agent")
# 所有 Chain / Agent 调用自动上报
result = rag_chain.invoke(
"信号系统故障的应急处置流程?",
config={"callbacks": [tracer]} # 注入追踪器
)
# 可追踪内容:
# ├── 每个节点的输入输出
# ├── LLM 调用 Token 消耗
# ├── 检索到的文档列表
# ├── 端到端延迟分布
# └── 错误堆栈与重试记录
7.4 关键指标 SLA
7.5 常见问题诊断与优化
八、技术栈总览
┌─────────────────────────────────────────────────────────────────┐
│ 技术栈全景 │
│ │
│ Agent 编排 LangGraph(复杂多步 Agent + Human-in-Loop) │
│ 基础框架 LangChain(LLM / Tool / RAG / Memory 组件) │
│ 向量数据库 Milvus(HNSW + 稀疏混合索引,分区策略) │
│ Embedding BGE-M3(中英双语,稠密 + 稀疏双路编码) │
│ 重排序 BGE-Reranker-v2(交叉编码精排) │
│ LLM GPT-4o 主 / Claude-3.5 备(Fallback 机制) │
│ 检索策略 混合检索(ANN + BM25)+ RRF + Rerank │
│ 文档解析 MinerU + DocLayout(PDF 版面 + 表格提取) │
│ 持久化 PostgreSQL(状态)+ Redis(缓存 + 语义缓存) │
│ 质量评测 RAGAS(Faithfulness / Relevancy / Recall) │
│ 链路追踪 LangSmith(全链路可观测) │
│ 监控告警 Prometheus + Grafana(指标 + 仪表盘) │
└─────────────────────────────────────────────────────────────────┘
核心思路:以 LangGraph 驱动复杂 Agent 推理与工作流编排,以 Milvus 混合检索 为知识底座,以 LangChain 提供全套基础能力组件,三层协同支撑轨道交通应急处置与智能运营全场景落地。