一、整体架构全景

┌──────────────────────────────────────────────────────────────────────┐
│                           业务应用层                                  │
│  ┌──────────────────┐  ┌──────────────────┐  ┌───────────────────┐  │
│  │  应急预案自动匹配   │  │  综合告警智能核查  │  │  规章制度智能问答   │  │
│  └────────┬─────────┘  └────────┬─────────┘  └────────┬──────────┘  │
└───────────┼─────────────────────┼─────────────────────┼─────────────┘
            │                     │                     │
┌───────────▼─────────────────────▼─────────────────────▼─────────────┐
│                     AI Agent 编排层(LangGraph)                      │
│  ┌─────────────────┐  ┌─────────────────┐  ┌──────────────────────┐ │
│  │  应急预案 Agent   │  │  告警核查 Agent  │  │   自动化工作流 Agent   │ │
│  │  多步推理 + 循环  │  │  多源融合 + 研判  │  │  端到端业务流程编排    │ │
│  └─────────────────┘  └─────────────────┘  └──────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
            │                     │                     │
┌───────────▼─────────────────────▼─────────────────────▼─────────────┐
│                          能力中台层                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌─────────────┐  ┌──────────┐  │
│  │  RAG 检索引擎  │  │  LLM 统一网关  │  │  Tool 工具集  │  │  Memory  │  │
│  │  混合检索召回  │  │  模型统一调度  │  │  函数/API调用 │  │  状态持久  │  │
│  └──────────────┘  └──────────────┘  └─────────────┘  └──────────┘  │
└──────────────────────────────────────────────────────────────────────┘
            │
┌───────────▼──────────────────────────────────────────────────────────┐
│                          数据存储层                                   │
│  ┌───────────────┐  ┌──────────────┐  ┌──────────┐  ┌────────────┐  │
│  │  Milvus 向量库  │  │  PostgreSQL  │  │  Redis   │  │  MinIO/OSS │  │
│  │  语义检索核心   │  │  结构化数据   │  │  热点缓存  │  │  原始文档   │  │
│  └───────────────┘  └──────────────┘  └──────────┘  └────────────┘  │
└──────────────────────────────────────────────────────────────────────┘

二、AI Agent 核心系统设计

2.1 Agent 整体设计原则

设计原则
├── 可观测     每个节点输入输出全链路可追踪(LangSmith)
├── 可恢复     Checkpoint 持久化,故障后从断点继续执行
├── 可介入     关键决策节点支持 Human-in-the-Loop 人工审核
├── 可解释     每步推理过程强制输出依据,结果可溯源
└── 可扩展     节点解耦,新业务场景复用已有节点能力

2.2 应急预案自动匹配 Agent

业务流程设计

告警事件输入
      │
      ▼
┌─────────────┐     信息不足
│  事件解析节点  │ ──────────────▶ 追问澄清 ──┐
│  类型/等级/位置│                             │
└──────┬──────┘ ◀───────────────────────────┘
       │ 信息完整
       ▼
┌─────────────┐
│  意图分类节点  │  设备故障 / 人员事件 / 自然灾害 / 运营中断
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  预案检索节点  │  Milvus 语义检索 + 类型过滤
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  预案排序节点  │  LLM 重排序 + 历史命中率加权
└──────┬──────┘
       │
       ▼
┌─────────────┐
│ 处置建议生成  │  结构化输出处置步骤 + 责任人 + 时限
└──────┬──────┘
       │
       ▼
┌─────────────┐     拒绝/修改
│ Human Review │ ──────────────▶ 返回排序节点重新生成
│  人工审核点   │
└──────┬──────┘ 确认
       │
       ▼
   结果输出 + 存档

核心代码实现

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, List, Optional

# ── 1. 定义 Agent 共享状态 Schema ─────────────────────────────────────
class EmergencyAgentState(TypedDict):
    alarm_raw: str                    # 原始告警文本
    event_type: Optional[str]         # 解析后事件类型
    event_level: Optional[str]        # 告警等级 红/橙/黄/蓝
    event_location: Optional[str]     # 事发位置
    retrieved_plans: List[dict]       # 检索到的预案列表
    ranked_plans: List[dict]          # 重排后的预案
    suggestion: Optional[str]         # 生成的处置建议
    human_feedback: Optional[str]     # 人工反馈意见
    is_approved: bool                 # 人工审核结果
    retry_count: int                  # 重试次数(防无限循环)
    trace_log: List[str]              # 每步执行日志(可解释性)

# ── 2. 定义各功能节点 ─────────────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o", temperature=0)

def event_parser_node(state: EmergencyAgentState) -> dict:
    """事件解析:提取类型、等级、位置"""
    prompt = ChatPromptTemplate.from_template("""
    你是轨道交通应急处置专家,请解析以下告警事件:
    {alarm_raw}

    请以 JSON 格式输出:
    {{"event_type": "...", "event_level": "红/橙/黄/蓝", "event_location": "..."}}
    """)
    result = (prompt | llm).invoke({"alarm_raw": state["alarm_raw"]})
    parsed = parse_json(result.content)
    return {
        "event_type": parsed.get("event_type"),
        "event_level": parsed.get("event_level"),
        "event_location": parsed.get("event_location"),
        "trace_log": state["trace_log"] + [f"[解析] 类型={parsed.get('event_type')}"]
    }

def plan_retrieval_node(state: EmergencyAgentState) -> dict:
    """从 Milvus 检索相关预案"""
    plans = milvus_retriever.search(
        query=f"{state['event_type']} {state['alarm_raw']}",
        filters={
            "doc_type": "emergency_plan",
            "event_type": state["event_type"],
            "is_active": True
        },
        top_k=10
    )
    return {
        "retrieved_plans": plans,
        "trace_log": state["trace_log"] + [f"[检索] 召回 {len(plans)} 条预案"]
    }

def plan_ranking_node(state: EmergencyAgentState) -> dict:
    """LLM 重排序 + 历史命中率加权"""
    reranked = reranker.rerank(
        query=state["alarm_raw"],
        documents=state["retrieved_plans"],
        top_k=3
    )
    return {
        "ranked_plans": reranked,
        "trace_log": state["trace_log"] + [f"[排序] Top1={reranked[0]['title']}"]
    }

def suggestion_generator_node(state: EmergencyAgentState) -> dict:
    """生成结构化处置建议"""
    context = "\n\n".join([p["content"] for p in state["ranked_plans"]])
    prompt = ChatPromptTemplate.from_template("""
    基于以下应急预案内容,针对当前事件生成处置建议:
    事件:{alarm}  等级:{level}

    参考预案:
    {context}

    请输出:1.立即处置步骤 2.责任人 3.处置时限 4.注意事项
    """)
    result = (prompt | llm).invoke({
        "alarm": state["alarm_raw"],
        "level": state["event_level"],
        "context": context
    })
    return {"suggestion": result.content}

def human_review_node(state: EmergencyAgentState) -> dict:
    """人工审核占位节点(interrupt_before 在此暂停)"""
    return state  # 等待外部注入 human_feedback 和 is_approved

# ── 3. 条件路由函数 ───────────────────────────────────────────────────
def check_info_completeness(state: EmergencyAgentState) -> str:
    """判断事件信息是否完整"""
    if not state.get("event_type") or not state.get("event_level"):
        return "clarify"           # 信息不足,追问
    return "retrieve"              # 信息完整,检索

def check_human_decision(state: EmergencyAgentState) -> str:
    """根据人工反馈决定下一步"""
    if state.get("is_approved"):
        return "output"            # 审核通过,输出
    if state.get("retry_count", 0) >= 3:
        return "output"            # 超过重试次数,强制输出
    return "re_rank"               # 拒绝,重新排序

# ── 4. 构建 LangGraph 图 ─────────────────────────────────────────────
graph = StateGraph(EmergencyAgentState)

graph.add_node("parser",       event_parser_node)
graph.add_node("retrieval",    plan_retrieval_node)
graph.add_node("ranking",      plan_ranking_node)
graph.add_node("suggestion",   suggestion_generator_node)
graph.add_node("human_review", human_review_node)

graph.set_entry_point("parser")

graph.add_conditional_edges("parser", check_info_completeness, {
    "clarify":  "parser",          # 循环追问
    "retrieve": "retrieval"
})
graph.add_edge("retrieval",    "ranking")
graph.add_edge("ranking",      "suggestion")
graph.add_edge("suggestion",   "human_review")
graph.add_conditional_edges("human_review", check_human_decision, {
    "re_rank":  "ranking",         # 重新排序
    "output":   END
})

# ── 5. 编译 + 持久化 Checkpoint ───────────────────────────────────────
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
emergency_agent = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["human_review"]   # 到审核节点前暂停,等人工操作
)

2.3 综合告警智能核查 Agent

核查流程设计

┌───────────────────────────────────────────────────────────┐
│                   多源数据采集                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │ 设备状态  │  │ 历史告警  │  │ 实时传感器│  │  视频流   │  │
│  │   API    │  │  数据库   │  │   数据   │  │  分析结果  │  │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘  │
└───────┼─────────────┼─────────────┼─────────────┼─────────┘
        │             │             │             │
        └─────────────┴──────┬──────┴─────────────┘
                             │ 并行采集(异步 IO)
                             ▼
                    ┌────────────────┐
                    │  数据融合节点   │  结构化告警上下文
                    └───────┬────────┘
                            │
               ┌────────────┴────────────┐
               ▼                         ▼
       ┌───────────────┐        ┌─────────────────┐
       │  有效性判断    │        │   关联性分析     │
       │  误报 / 真实   │        │  同区段聚合      │
       └───────┬───────┘        │  因果链推断      │
               │                └────────┬────────┘
               └──────────┬─────────────┘
                           │
                  ┌────────▼────────┐
                  │   等级研判节点   │  红/橙/黄/蓝 四级定级
                  └────────┬────────┘
                           │
                  ┌────────▼────────┐
                  │  处置优先级排序  │  影响范围 × 紧急程度
                  └────────┬────────┘
                           │
                  ┌────────▼────────┐
                  │ 核查报告生成节点  │  结构化输出 + 推送责任人
                  └─────────────────┘

关键能力实现

能力

技术实现

说明

多源数据并行采集

asyncio.gather + 异步 Tool

并行调用多个数据源,降低延迟

告警去重聚合

时间窗口 + 向量相似度

同类告警合并,避免重复处置

因果链推断

LLM CoT + 知识图谱

推断告警间因果关系

等级研判

Few-shot Prompt + RAG

历史案例增强,提高研判准确性

结果可解释

强制 JSON 输出推理链

每步决策依据可溯源

from langchain_core.tools import tool
import asyncio

# 异步并行工具调用:同时获取多源数据
@tool
async def get_device_status(device_id: str) -> dict:
    """获取设备实时状态"""
    return await device_api.get_status(device_id)

@tool
async def get_historical_alarms(location: str, hours: int = 24) -> list:
    """获取历史告警记录"""
    return await alarm_db.query(location=location, hours=hours)

@tool
async def get_sensor_data(sensor_ids: list) -> dict:
    """获取传感器实时数据"""
    return await sensor_api.batch_get(sensor_ids)

async def data_fusion_node(state):
    """并行采集多源数据,降低整体延迟"""
    results = await asyncio.gather(
        get_device_status.ainvoke(state["device_id"]),
        get_historical_alarms.ainvoke(state["location"]),
        get_sensor_data.ainvoke(state["sensor_ids"]),
        return_exceptions=True         # 某路失败不影响其他路
    )
    return {"fused_context": merge_results(results)}

三、RAG 知识库检索系统

3.1 RAG 全链路设计

━━━━━━━━━━━━━━━━━━━  离线索引构建  ━━━━━━━━━━━━━━━━━━━

原始文档(规章/预案/手册)
        │
        ▼
① 文档解析      MinerU + DocLayout 版面分析
        │        ├─ 正文段落提取
        │        ├─ 表格结构化解析
        │        └─ 图片 OCR 识别
        ▼
② 智能切块      按语义边界切分(非固定长度)
        │        ├─ 规章制度:按条款号切分
        │        ├─ 应急预案:按处置步骤切分
        │        └─ 通用内容:语义段落切分
        ▼
③ 向量化        BGE-M3 双路编码
        │        ├─ 稠密向量(dim=1024)语义理解
        │        └─ 稀疏向量(BM25权重)关键词匹配
        ▼
④ 写入 Milvus   稠密 + 稀疏向量 + Metadata 一并写入

━━━━━━━━━━━━━━━━━━━  在线检索召回  ━━━━━━━━━━━━━━━━━━━

用户查询
        │
        ▼
① 查询改写      HyDE 假设文档扩展 + 多路查询生成
        │
        ▼
② 混合检索      稠密检索(ANN余弦)+ 稀疏检索(BM25)
        │        └─ RRF 倒排融合排序
        ▼
③ 粗排过滤      Metadata 过滤(文档类型/有效期/版本)
        │
        ▼
④ 精排重排      BGE-Reranker-v2 交叉编码精排
        │
        ▼
⑤ 上下文组装    召回片段 + 来源编号 + 置信度 → LLM 生成答案

3.2 智能切块策略

import re
from dataclasses import dataclass
from langchain_core.documents import Document

@dataclass
class ChunkConfig:
    max_size: int = 512
    overlap: int = 64
    min_size: int = 100

class RailTransitSmartChunker:
    """轨道交通规章制度专用语义切块器"""

    def chunk(self, document: Document) -> list[Document]:
        doc_type = self._detect_doc_type(document)

        strategy_map = {
            "regulation":     self._split_by_article,    # 规章:按条款
            "emergency_plan": self._split_by_procedure,  # 预案:按步骤
            "table":          self._keep_table_intact,   # 表格:整表保留
            "general":        self._semantic_split,      # 通用:语义段落
        }
        chunks = strategy_map.get(doc_type, self._semantic_split)(document)

        # 为每个 Chunk 注入 Metadata
        return [self._enrich_metadata(chunk, document, i)
                for i, chunk in enumerate(chunks)]

    def _split_by_article(self, doc: Document) -> list[Document]:
        """按"第X条"切分,保留父级章节作为上下文"""
        pattern = r"(第[一二三四五六七八九十百\d]+条[^\n]*)"
        parts = re.split(pattern, doc.page_content)
        chunks = []
        for i in range(1, len(parts), 2):
            title = parts[i].strip()
            content = parts[i + 1].strip() if i + 1 < len(parts) else ""
            if len(content) >= ChunkConfig.min_size:
                chunks.append(Document(
                    page_content=f"{title}\n{content}",
                    metadata={"chunk_type": "article", "article_title": title}
                ))
        return chunks

    def _enrich_metadata(self, chunk: Document,
                          source: Document, idx: int) -> Document:
        """为 Chunk 注入完整 Metadata,支持 Milvus 过滤"""
        chunk.metadata.update({
            "doc_id":         source.metadata.get("doc_id"),
            "doc_type":       source.metadata.get("doc_type"),
            "category":       source.metadata.get("category"),     # 信号/供电/车辆
            "version":        source.metadata.get("version"),
            "effective_date": source.metadata.get("effective_date"),
            "is_active":      source.metadata.get("is_active", True),
            "chunk_index":    idx,
            "char_count":     len(chunk.page_content),
        })
        return chunk

3.3 混合检索实现

from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document

class HybridRetriever(BaseRetriever):
    """稠密 + 稀疏混合检索,RRF 融合排序"""

    def _get_relevant_documents(self, query: str) -> list[Document]:

        # ① 查询改写:HyDE 生成假设文档扩展语义
        hyde_doc  = self._hyde_expansion(query)
        sub_queries = self._generate_sub_queries(query)   # 多路子查询

        # ② 稠密检索(语义向量 ANN)
        dense_hits = self.milvus.search(
            vectors=[self.embedder.embed(hyde_doc)],
            anns_field="dense_vec",
            param={"metric_type": "COSINE", "params": {"ef": 256}},
            filter=f"is_active == true",
            limit=20
        )

        # ③ 稀疏检索(BM25 关键词)
        sparse_hits = self.milvus.search(
            vectors=[self.bm25.encode(query)],
            anns_field="sparse_vec",
            param={"metric_type": "IP"},
            filter=f"is_active == true",
            limit=20
        )

        # ④ RRF 倒排融合
        fused = self._reciprocal_rank_fusion(
            [dense_hits, sparse_hits], k=60
        )

        # ⑤ BGE-Reranker 精排(Top 20 → Top 5)
        reranked = self.reranker.rerank(
            query=query,
            documents=fused[:20],
            top_k=5
        )
        return reranked

    def _reciprocal_rank_fusion(self, lists: list, k: int = 60) -> list:
        scores = {}
        for result_list in lists:
            for rank, hit in enumerate(result_list):
                doc_id = hit.id
                scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
        sorted_ids = sorted(scores, key=scores.get, reverse=True)
        return [self._id_to_doc(doc_id) for doc_id in sorted_ids]

    def _hyde_expansion(self, query: str) -> str:
        """HyDE:让 LLM 生成假设答案文档,增强语义检索"""
        prompt = f"请生成一段回答以下问题的文档片段:\n{query}"
        return self.llm.invoke(prompt).content

四、Milvus 向量数据库架构方案

4.1 集群部署架构

                      外部请求
                          │
                   负载均衡(Nginx)
                          │
              ┌───────────┴───────────┐
              ▼                       ▼
        Milvus Proxy              Milvus Proxy        ← 无状态,水平扩展
              │
    ┌─────────┴──────────┐
    ▼                    ▼
Root Coord           Query Coord                      ← 元数据 + 查询协调
    │                    │
    ▼                    ▼
Data Node × 3      Query Node × 4                     ← 写入 / 检索节点
                         │
                   Index Node × 2                     ← 异步构建索引

        ┌───────────────────────────────┐
        │         持久化存储层           │
        │  MinIO   etcd    Pulsar/Kafka  │
        │ 原始数据   元数据    消息队列    │
        └───────────────────────────────┘

4.2 Collection Schema 设计

from pymilvus import CollectionSchema, FieldSchema, DataType, Collection

def create_rail_knowledge_collection() -> Collection:
    fields = [
        # 主键与文本
        FieldSchema("id",             DataType.INT64,          is_primary=True, auto_id=True),
        FieldSchema("doc_id",         DataType.VARCHAR,        max_length=64),
        FieldSchema("chunk_text",     DataType.VARCHAR,        max_length=4096),

        # 向量字段
        FieldSchema("dense_vec",      DataType.FLOAT_VECTOR,   dim=1024),   # BGE-M3 稠密
        FieldSchema("sparse_vec",     DataType.SPARSE_FLOAT_VECTOR),        # BM25 稀疏

        # Metadata 过滤字段
        FieldSchema("doc_type",       DataType.VARCHAR,        max_length=32),
        FieldSchema("category",       DataType.VARCHAR,        max_length=64),
        FieldSchema("version",        DataType.VARCHAR,        max_length=16),
        FieldSchema("effective_date", DataType.INT64),          # 时间戳
        FieldSchema("expire_date",    DataType.INT64),          # 失效时间戳
        FieldSchema("is_active",      DataType.BOOL),
    ]

    schema = CollectionSchema(fields, description="轨交知识库")
    col = Collection(
        name="rail_transit_knowledge",
        schema=schema,
        partition_key_field="doc_type"   # 按文档类型分区,缩小检索范围
    )

    # ── 稠密向量索引:HNSW(高召回 + 低延迟)───────────────────────────
    col.create_index("dense_vec", {
        "index_type":  "HNSW",
        "metric_type": "COSINE",
        "params":      {"M": 16, "efConstruction": 256}
    })

    # ── 稀疏向量索引:倒排索引 ────────────────────────────────────────
    col.create_index("sparse_vec", {
        "index_type":  "SPARSE_INVERTED_INDEX",
        "metric_type": "IP",
        "params":      {"drop_ratio_build": 0.2}
    })

    col.load()
    return col

4.3 索引性能调优策略

调优维度

参数

推荐值

说明

HNSW 构建质量

M

16 ~ 32

越大召回越高,内存占用越多

HNSW 构建质量

efConstruction

256 ~ 400

越大索引越精准,构建越慢

HNSW 检索精度

ef(运行时)

128 ~ 256

查询时动态调整,精度 / 速度互换

稀疏向量裁剪

drop_ratio_build

0.2

裁剪低权重词,减少索引体积

分区策略

partition_key_field

doc_type

按类型分区,减少无效扫描

内存管理

MMap 开启

超大集合

向量映射到磁盘,节省内存

批量写入

batch_size

2000 ~ 5000

避免频繁小批量,提升写入吞吐

Segment 大小

maxRowsPerSegment

1,000,000

控制单 Segment 规模

4.4 数据生命周期管理

class MilvusDataManager:
    """向量数据生命周期管理:版本更新 / 失效清理"""

    def update_document_version(self, doc_id: str, new_content: str,
                                  new_version: str):
        """文档版本更新:逻辑失效旧版本 + 写入新版本"""
        # Step 1:旧版本逻辑失效(不物理删除,保留历史)
        self.collection.delete(
            expr=f'doc_id == "{doc_id}" and version != "{new_version}"'
        )
        # Step 2:写入新版本 Chunks
        new_chunks = self.chunker.chunk(new_content)
        self._batch_insert(new_chunks, doc_id, new_version)

    def cleanup_expired_docs(self):
        """定时清理过期文档(每日凌晨执行)"""
        today_ts = int(datetime.now().timestamp())
        self.collection.delete(
            expr=f"expire_date > 0 and expire_date < {today_ts}"
        )

    def _batch_insert(self, chunks: list, doc_id: str,
                       version: str, batch_size: int = 2000):
        """批量写入,避免频繁小批次"""
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i: i + batch_size]
            embeddings = self.embedder.batch_embed([c.text for c in batch])
            self.collection.insert([build_entities(batch, embeddings)])

五、LangChain 框架深度应用

5.1 LangChain 核心模块使用全景

LangChain 在本项目中的使用分工
│
├── langchain-core          基础抽象层
│   ├── BaseRetriever       自定义混合检索器基类
│   ├── BaseTool            自定义业务工具基类
│   ├── BaseMemory          对话历史管理基类
│   └── RunnableSequence    LCEL 链式组合
│
├── langchain-openai        LLM 接入
│   ├── ChatOpenAI          GPT-4o 调用
│   └── OpenAIEmbeddings    Embedding 备选
│
├── langchain-community     生态集成
│   ├── MilvusRetriever     Milvus 向量检索集成
│   └── RedisCache          LLM 调用结果缓存
│
└── langgraph               Agent 编排(重点)
    ├── StateGraph          有状态图定义
    ├── Checkpoint          状态持久化
    └── interrupt_before    Human-in-the-Loop

5.2 LCEL(LangChain 表达式语言)构建 RAG Chain

from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# ── RAG Prompt 模板 ────────────────────────────────────────────────────
RAG_PROMPT = ChatPromptTemplate.from_template("""
你是轨道交通运营专家,请根据以下参考内容回答问题。
如果参考内容中没有相关信息,请明确说明,不要编造。

参考内容:
{context}

问题:{question}

请给出准确、专业的回答,并标注参考来源:
""")

# ── 构建 RAG Chain(LCEL 链式组合)──────────────────────────────────────
def format_docs(docs: list) -> str:
    """将检索结果格式化为带来源的上下文"""
    return "\n\n".join([
        f"[来源{i+1}] {doc.metadata.get('doc_type','')}-"
        f"{doc.metadata.get('article_title','')}\n{doc.page_content}"
        for i, doc in enumerate(docs)
    ])

rag_chain = (
    {
        "context":  hybrid_retriever | format_docs,   # 并行:检索 + 格式化
        "question": RunnablePassthrough()             # 原始问题直传
    }
    | RAG_PROMPT                                      # 注入 Prompt
    | ChatOpenAI(model="gpt-4o", temperature=0)       # LLM 生成
    | StrOutputParser()                               # 解析输出
)

# ── 流式输出(提升用户体验)──────────────────────────────────────────────
async def stream_answer(question: str):
    async for chunk in rag_chain.astream(question):
        yield chunk    # Server-Sent Events 推送到前端

5.3 自定义 Tool 工具集

from langchain_core.tools import tool, StructuredTool
from pydantic import BaseModel, Field

# ── 工具输入 Schema(参数类型安全)──────────────────────────────────────
class AlarmQueryInput(BaseModel):
    location:   str = Field(description="告警发生的位置,如:1号线-人民广场站")
    alarm_type: str = Field(description="告警类型:设备故障/人员事件/运营中断")
    hours:      int = Field(default=24, description="查询最近N小时的历史告警")

class PlanSearchInput(BaseModel):
    event_description: str = Field(description="事件描述文本")
    event_level: str       = Field(description="事件等级:红/橙/黄/蓝")
    top_k: int             = Field(default=3, description="返回预案数量")

# ── 业务工具定义 ─────────────────────────────────────────────────────────
@tool(args_schema=AlarmQueryInput)
def query_historical_alarms(location: str, alarm_type: str,
                             hours: int = 24) -> list:
    """查询指定位置的历史告警记录,用于告警关联分析"""
    return alarm_database.query(location=location,
                                 alarm_type=alarm_type, hours=hours)

@tool(args_schema=PlanSearchInput)
def search_emergency_plans(event_description: str, event_level: str,
                            top_k: int = 3) -> list:
    """从应急预案知识库检索匹配预案"""
    return milvus_retriever.search(
        query=event_description,
        filters={"doc_type": "emergency_plan"},
        top_k=top_k
    )

@tool
def get_realtime_device_status(device_ids: list[str]) -> dict:
    """获取设备实时运行状态"""
    return device_api.batch_get_status(device_ids)

@tool
def send_emergency_notification(recipients: list[str],
                                 content: str, level: str) -> bool:
    """发送应急通知给责任人"""
    return notification_service.send(recipients, content, level)

# 工具注册
TOOL_REGISTRY = [
    query_historical_alarms,
    search_emergency_plans,
    get_realtime_device_status,
    send_emergency_notification,
]

5.4 LLM 调用优化

from langchain_community.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
import langchain

# ── 语义缓存:相似问题命中缓存,降低 LLM 调用成本 ─────────────────────
langchain.llm_cache = RedisSemanticCache(
    redis_url=REDIS_URL,
    embedding=OpenAIEmbeddings(),
    score_threshold=0.95      # 相似度 > 0.95 直接命中缓存
)

# ── Fallback:主模型失败自动切换备用模型 ────────────────────────────────
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

primary_llm  = ChatOpenAI(model="gpt-4o", temperature=0)
fallback_llm = ChatAnthropic(model="claude-3-5-sonnet")

llm_with_fallback = primary_llm.with_fallbacks(
    [fallback_llm],
    exceptions_to_handle=(Exception,)
)

# ── 结构化输出:强制 JSON 格式,避免解析失败 ────────────────────────────
from pydantic import BaseModel

class AlarmAnalysisResult(BaseModel):
    is_valid:      bool        # 是否有效告警
    alarm_level:   str         # 研判等级
    root_cause:    str         # 根本原因
    action_items:  list[str]   # 处置建议
    confidence:    float       # 置信度 0~1

structured_llm = llm_with_fallback.with_structured_output(AlarmAnalysisResult)

六、自动化工作流设计与编排

6.1 工作流分层架构

自动化工作流体系
│
├── L1 原子工作流      单一功能节点,不可再拆分
│   ├── 告警接收 & 解析
│   ├── 预案向量检索
│   ├── LLM 推理调用
│   ├── 通知消息发送
│   └── 结果存档写库
│
├── L2 组合工作流      多个原子流按业务逻辑编排
│   ├── 告警 → 核查 → 预案匹配 → 通知
│   ├── 设备故障 → 影响评估 → 调度优化
│   └── 规章查询 → 语义检索 → 答案生成
│
└── L3 业务场景工作流   端到端完整业务,含人工介入点
    ├── 应急预案全流程自动处置
    ├── 综合告警全流程核查研判
    └── 规章合规全流程检查

6.2 工作流可复用基类

from abc import ABC, abstractmethod
from uuid import uuid4
from langgraph.graph import CompiledGraph

class BaseAgentWorkflow(ABC):
    """所有工作流的可复用基类"""

    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.graph: CompiledGraph = self._build_graph()

    @abstractmethod
    def _build_graph(self) -> CompiledGraph:
        """子类实现具体节点和边的构建"""
        pass

    def run(self, input_data: dict, thread_id: str = None) -> dict:
        """同步执行工作流"""
        config = {"configurable": {
            "thread_id": thread_id or uuid4().hex
        }}
        return self.graph.invoke(input_data, config=config)

    def resume(self, thread_id: str, human_input: dict) -> dict:
        """人工审核后继续执行"""
        config = {"configurable": {"thread_id": thread_id}}
        return self.graph.invoke(human_input, config=config)

    def get_state(self, thread_id: str) -> dict:
        """查询当前工作流状态(用于前端展示进度)"""
        config = {"configurable": {"thread_id": thread_id}}
        return self.graph.get_state(config)

    async def arun(self, input_data: dict, thread_id: str = None):
        """异步执行(高并发场景)"""
        config = {"configurable": {
            "thread_id": thread_id or uuid4().hex
        }}
        async for event in self.graph.astream_events(input_data, config=config):
            yield event    # 流式推送执行进度

七、性能测试与持续优化

7.1 性能测试体系

测试体系
│
├── 质量评测(RAGAS 框架)
│   ├── Faithfulness         答案忠实度(不产生幻觉)
│   ├── Answer Relevancy     答案相关性
│   ├── Context Precision    检索上下文精确率
│   ├── Context Recall       检索上下文召回率
│   └── 业务指标             预案命中率 / 告警核查准确率
│
├── 性能测试(Locust)
│   ├── 接口延迟             P50 / P95 / P99
│   ├── 并发吞吐             QPS 压测
│   └── Milvus 检索性能      百万级向量 < 50ms
│
└── 稳定性测试
    ├── 长时运行             7×24h 连续压测
    ├── 故障注入             节点宕机 / 网络分区
    └── 恢复验证             Checkpoint 恢复正确性

7.2 RAGAS 自动化评测

from ragas import evaluate
from ragas.metrics import (
    faithfulness,       # 忠实度:答案是否基于检索内容
    answer_relevancy,   # 相关性:答案是否回答了问题
    context_precision,  # 上下文精确率:检索内容是否都有用
    context_recall,     # 上下文召回率:是否检索到所有必要内容
)
from datasets import Dataset

def run_ragas_evaluation(test_cases: list[dict]) -> dict:
    """
    test_cases 格式:[{
        "question":         "...",
        "answer":           "...",    ← RAG 系统生成的答案
        "contexts":         ["..."],  ← 检索到的上下文列表
        "ground_truth":     "..."     ← 人工标注的标准答案
    }]
    """
    dataset = Dataset.from_list(test_cases)

    results = evaluate(
        dataset=dataset,
        metrics=[faithfulness, answer_relevancy,
                 context_precision, context_recall],
        llm=eval_llm,
        embeddings=eval_embeddings,
        raise_exceptions=False
    )

    # 结果写入监控平台,触发告警
    metrics_monitor.report({
        "faithfulness":      results["faithfulness"],
        "answer_relevancy":  results["answer_relevancy"],
        "context_precision": results["context_precision"],
        "context_recall":    results["context_recall"],
        "eval_version":      current_version,
        "eval_time":         datetime.now().isoformat(),
    })
    return results

# CI/CD 集成:每次 Prompt / 模型变更后自动触发评测
# 任意指标低于阈值则阻断发布

7.3 全链路追踪(LangSmith)

from langsmith import Client
from langchain_core.tracers import LangChainTracer

# 配置 LangSmith 全链路追踪
tracer = LangChainTracer(project_name="rail-transit-agent")

# 所有 Chain / Agent 调用自动上报
result = rag_chain.invoke(
    "信号系统故障的应急处置流程?",
    config={"callbacks": [tracer]}     # 注入追踪器
)

# 可追踪内容:
# ├── 每个节点的输入输出
# ├── LLM 调用 Token 消耗
# ├── 检索到的文档列表
# ├── 端到端延迟分布
# └── 错误堆栈与重试记录

7.4 关键指标 SLA

指标

目标值

告警阈值

预案匹配 Top3 命中率

≥ 90%

< 85%

告警核查准确率

≥ 95%

< 90%

RAG Faithfulness

≥ 0.90

< 0.85

检索延迟 P95

< 200ms

> 500ms

Agent 端到端响应

< 5s

> 10s

工作流执行成功率

≥ 99.5%

< 99%

Milvus 百万级检索

< 50ms

> 100ms

7.5 常见问题诊断与优化

问题现象

定位手段

优化方案

预案匹配不准确

RAGAS Context Recall 低

优化切块策略 / 引入 HyDE

LLM 答案幻觉

Faithfulness 指标下降

加强 Prompt 约束 + 引用溯源强制

检索延迟高

Milvus 慢查询日志

调大 ef 参数 / 增加 Query Node

Agent 响应慢

LangSmith 链路分析

工具调用并行化 / 流式输出

工作流执行失败

节点异常日志

增加重试机制 + 人工兜底节点

状态恢复异常

Checkpoint 日志

校验 PostgreSQL Checkpoint 完整性

数据倾斜导致检索慢

Milvus Partition 分布

调整 Partition Key 策略


八、技术栈总览

┌─────────────────────────────────────────────────────────────────┐
│                        技术栈全景                                │
│                                                                 │
│  Agent 编排      LangGraph(复杂多步 Agent + Human-in-Loop)      │
│  基础框架        LangChain(LLM / Tool / RAG / Memory 组件)      │
│  向量数据库      Milvus(HNSW + 稀疏混合索引,分区策略)           │
│  Embedding       BGE-M3(中英双语,稠密 + 稀疏双路编码)           │
│  重排序          BGE-Reranker-v2(交叉编码精排)                  │
│  LLM             GPT-4o 主 / Claude-3.5 备(Fallback 机制)      │
│  检索策略        混合检索(ANN + BM25)+ RRF + Rerank            │
│  文档解析        MinerU + DocLayout(PDF 版面 + 表格提取)         │
│  持久化          PostgreSQL(状态)+ Redis(缓存 + 语义缓存)      │
│  质量评测        RAGAS(Faithfulness / Relevancy / Recall)      │
│  链路追踪        LangSmith(全链路可观测)                        │
│  监控告警        Prometheus + Grafana(指标 + 仪表盘)            │
└─────────────────────────────────────────────────────────────────┘

核心思路:以 LangGraph 驱动复杂 Agent 推理与工作流编排,以 Milvus 混合检索 为知识底座,以 LangChain 提供全套基础能力组件,三层协同支撑轨道交通应急处置与智能运营全场景落地。