利用 LangGraph 搭建智能体:在知识图谱构建过程中实现实时数据清洗
前言
在人工智能与大数据技术飞速发展的今天,知识图谱(Knowledge Graph)已成为企业智能化转型的核心基础设施之一。它不仅能够将分散的、异构的数据以结构化的方式组织起来,还能为下游的推理、问答、推荐等任务提供强有力的语义支撑。然而,知识图谱的构建从来都不是一帆风顺的——原始数据中充斥着噪声、冗余、格式不一致、实体歧义等问题,这些问题如果不在构建阶段及时处理,将严重影响图谱的质量与可用性。
传统的数据清洗方案通常采用"先批量清洗,再构建图谱"的离线流水线模式。这种方式固然稳定,但存在明显的局限性:清洗规则难以动态调整、无法感知上下文语义、面对多源异构数据时扩展性差。而随着大语言模型(LLM)与智能体(Agent)技术的成熟,一种全新的范式正在兴起——利用智能体在知识图谱构建过程中进行实时、语义感知的数据清洗。
本文将深入介绍如何利用 LangGraph 框架搭建一套具备实时数据清洗能力的知识图谱构建智能体系统。我们将从 LangGraph 的核心概念出发,逐步拆解系统架构设计、节点功能划分、数据流转逻辑,并结合具体代码示例,为读者提供一套可落地的工程实践参考。
一、理解 LangGraph:不只是链,而是图
1.1 从 LangChain 到 LangGraph 的演进
LangChain 在 LLM 应用开发领域积累了大量用户,其链式(Chain)调用模式极大简化了提示词工程与模型调用的复杂度。然而,随着应用场景的复杂化,纯链式结构的局限性愈发明显:
无法处理循环逻辑:真实的智能体往往需要根据中间结果反复迭代,而链式结构天然是单向的。
状态管理困难:跨步骤的状态传递在链式结构中较为笨拙。
缺乏条件分支:复杂的决策逻辑难以优雅表达。
LangGraph 正是为解决这些问题而生。它将智能体的执行逻辑抽象为一张有向图(Directed Graph),其中:
节点(Node):代表一个具体的计算单元,可以是 LLM 调用、工具调用、数据处理函数等。
边(Edge):代表节点之间的数据流转关系,支持条件边(Conditional Edge)实现动态路由。
状态(State):贯穿整个图执行过程的共享数据结构,每个节点可以读取并更新状态。
这种图结构天然支持循环、分支、并行等复杂控制流,使得构建复杂的多步骤智能体成为可能。
1.2 LangGraph 的核心组件
在正式进入系统设计之前,我们先梳理 LangGraph 的几个核心概念:
StateGraph(状态图)
StateGraph 是 LangGraph 的核心类,用于定义整个智能体的执行图。开发者需要先定义一个状态模式(通常使用 TypedDict),然后基于这个状态模式创建图实例。
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, END
class KGBuildState(TypedDict):
raw_data: List[dict] # 原始输入数据
cleaned_data: List[dict] # 清洗后的数据
entities: List[dict] # 提取的实体
relations: List[dict] # 提取的关系
kg_triples: List[tuple] # 知识图谱三元组
error_log: List[str] # 错误日志
iteration_count: int # 迭代次数
quality_score: float # 数据质量分数
节点函数
每个节点本质上是一个 Python 函数,接收当前状态作为输入,返回状态的更新部分:
def data_ingestion_node(state: KGBuildState) -> dict:
# 数据摄入逻辑
...
return {"raw_data": ingested_data}
条件边
条件边允许根据当前状态动态决定下一步执行哪个节点,这是实现智能决策的关键:
def should_reclean(state: KGBuildState) -> str:
if state["quality_score"] < 0.8:
return "data_cleaning" # 返回节点名称
return "entity_extraction"
二、系统整体架构设计
2.1 业务场景描述
假设我们需要从多源异构数据(包括新闻文章、企业公告、学术论文摘要等)中自动构建一个关于科技公司投融资关系的知识图谱。原始数据来源广泛,存在以下典型问题:
格式不统一:日期格式、金额单位、公司名称缩写各异
信息噪声:HTML 标签残留、广告语、无关段落混入
实体歧义:同一公司在不同文本中有多种称谓(如"阿里"、"阿里巴巴"、"Alibaba")
关系冗余:同一投融资事件被多篇文章重复描述
数据缺失:部分关键字段(如融资金额、时间)缺失
2.2 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ LangGraph 智能体系统 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ 数据摄入 │───▶│ 数据评估 │───▶│ 条件路由(质量判断) │ │
│ │ Node │ │ Node │ │ Edge │ │
│ └──────────┘ └──────────┘ └──────────────────────┘ │
│ │ │ │
│ 质量不足 质量达标 │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ 深度清洗 │ │ 实体抽取 │ │
│ │ Node │ │ Node │ │
│ └──────────┘ └──────────┘ │
│ │ │ │
│ └────┬────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 关系抽取 │ │
│ │ Node │ │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 实体消歧& │ │
│ │ 冲突解决 │ │
│ │ Node │ │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 图谱写入& │ │
│ │ 质量验证 │ │
│ │ Node │ │
│ └──────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ 验证失败 验证通过 │
│ │ │ │
│ ▼ ▼ │
│ ┌────────┐ ┌──────┐ │
│ │ 回滚修复│ │ END │ │
│ └────────┘ └──────┘ │
└─────────────────────────────────────────────────────────────┘
2.3 各节点职责说明
三、核心节点的详细实现
3.1 环境准备与依赖安装
pip install langgraph langchain langchain-openai
pip install neo4j sentence-transformers
pip install pandas numpy pydantic
3.2 状态定义与初始化
from typing import TypedDict, List, Optional, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
import operator
# 定义实体和关系的数据模型
class Entity(BaseModel):
id: str
name: str
type: str # COMPANY, PERSON, INVESTMENT_ROUND, etc.
attributes: dict
confidence: float
source_text: str
class Relation(BaseModel):
head_entity_id: str
tail_entity_id: str
relation_type: str
attributes: dict
confidence: float
source_text: str
class DataQualityReport(BaseModel):
overall_score: float
issues: List[str]
suggestions: List[str]
needs_deep_cleaning: bool
# 图的状态定义
class KGBuildState(TypedDict):
# 数据流
raw_documents: List[dict]
cleaned_documents: List[dict]
quality_reports: List[DataQualityReport]
# 抽取结果
entities: Annotated[List[Entity], operator.add]
relations: Annotated[List[Relation], operator.add]
# 消歧结果
canonical_entities: List[Entity]
merged_relations: List[Relation]
# 控制流
current_batch_index: int
iteration_count: int
max_iterations: int
# 质量指标
overall_quality_score: float
# 输出
kg_write_results: List[dict]
error_log: Annotated[List[str], operator.add]
# 配置
quality_threshold: float
3.3 数据摄入节点
import json
from datetime import datetime
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def data_ingestion_node(state: KGBuildState) -> dict:
"""
数据摄入节点:负责从不同数据源读取数据,
统一封装为标准格式,并进行初步的格式规范化。
"""
print("📥 [数据摄入节点] 开始读取原始数据...")
raw_documents = state.get("raw_documents", [])
normalized_docs = []
for idx, doc in enumerate(raw_documents):
try:
normalized_doc = {
"id": doc.get("id", f"doc_{idx}_{datetime.now().timestamp()}"),
"source": doc.get("source", "unknown"),
"content": str(doc.get("content", "")),
"metadata": {
"ingestion_time": datetime.now().isoformat(),
"original_format": doc.get("format", "text"),
"language": doc.get("language", "zh"),
"char_count": len(str(doc.get("content", "")))
},
"processing_status": "ingested"
}
normalized_docs.append(normalized_doc)
except Exception as e:
error_msg = f"文档 {idx} 摄入失败: {str(e)}"
print(f" ⚠️ {error_msg}")
print(f" ✅ 成功摄入 {len(normalized_docs)}/{len(raw_documents)} 个文档")
return {
"cleaned_documents": normalized_docs,
"current_batch_index": 0,
"iteration_count": 0
}
3.4 数据质量评估节点(实时清洗的关键入口)
这是整个系统中最关键的节点之一。它利用 LLM 的语义理解能力,对每个文档进行多维度的质量评估,并生成详细的问题报告,为后续的针对性清洗提供指导。
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
quality_assessment_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的数据质量评估专家,专注于知识图谱构建场景下的数据质量分析。
你需要对输入文本进行全面的质量评估,并以JSON格式返回评估报告。
评估维度包括:
1. 完整性(Completeness):关键信息是否齐全
2. 准确性(Accuracy):信息是否准确无误
3. 一致性(Consistency):格式和表达是否统一
4. 噪声程度(Noise Level):是否包含无关信息
5. 实体清晰度(Entity Clarity):实体边界是否清晰
返回JSON格式:
{{
"overall_score": 0.0-1.0,
"dimension_scores": {{
"completeness": 0.0-1.0,
"accuracy": 0.0-1.0,
"consistency": 0.0-1.0,
"noise_level": 0.0-1.0,
"entity_clarity": 0.0-1.0
}},
"issues": ["具体问题描述1", "具体问题描述2"],
"suggestions": ["清洗建议1", "清洗建议2"],
"needs_deep_cleaning": true/false,
"cleaning_priority": "high/medium/low"
}}"""),
("human", "请评估以下文本的数据质量:\n\n{text}")
])
def data_quality_assessment_node(state: KGBuildState) -> dict:
"""
数据质量评估节点:利用 LLM 对每个文档进行语义级质量评估。
这是实时清洗决策的核心依据。
"""
print("🔍 [质量评估节点] 开始评估数据质量...")
documents = state.get("cleaned_documents", [])
quality_reports = []
total_score = 0.0
parser = JsonOutputParser()
chain = quality_assessment_prompt | llm | parser
for doc in documents:
try:
# 截取前 1000 字符进行快速评估(节省 token)
sample_text = doc["content"][:1000]
result = chain.invoke({"text": sample_text})
report = DataQualityReport(
overall_score=result.get("overall_score", 0.5),
issues=result.get("issues", []),
suggestions=result.get("suggestions", []),
needs_deep_cleaning=result.get("needs_deep_cleaning", False)
)
# 将评估报告附加到文档
doc["quality_report"] = result
doc["quality_score"] = result.get("overall_score", 0.5)
quality_reports.append(report)
total_score += report.overall_score
print(f" 📊 文档 {doc['id'][:20]}... 质量得分: {report.overall_score:.2f}")
if report.issues:
for issue in report.issues[:2]: # 只打印前两个问题
print(f" ⚠️ {issue}")
except Exception as e:
error_msg = f"文档 {doc.get('id', 'unknown')} 质量评估失败: {str(e)}"
quality_reports.append(DataQualityReport(
overall_score=0.3,
issues=[error_msg],
suggestions=["建议人工审核"],
needs_deep_cleaning=True
))
avg_score = total_score / len(documents) if documents else 0.0
print(f" 📈 整体平均质量得分: {avg_score:.2f}")
return {
"quality_reports": quality_reports,
"overall_quality_score": avg_score,
"cleaned_documents": documents # 更新文档(附带质量报告)
}
3.5 实时深度清洗节点
这是整个系统的核心亮点。与传统的基于规则的批量清洗不同,该节点利用 LLM 根据每个文档的具体质量问题,制定并执行个性化的清洗策略。
deep_cleaning_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的文本数据清洗专家,专注于知识图谱构建场景。
你的任务是根据提供的质量问题报告,对文本进行精准清洗。
清洗原则:
1. 保留所有有价值的实体信息(公司名、人名、金额、时间、地点)
2. 统一实体表达(如统一日期格式为 YYYY-MM-DD,金额统一为人民币元)
3. 去除无关噪声(广告语、HTML标签、重复内容)
4. 修复明显的错别字和格式错误
5. 标准化公司名称(保留全称,在括号中注明常用简称)
6. 保持文本的语义完整性,不要删除关键信息
返回JSON格式:
{{
"cleaned_text": "清洗后的文本",
"cleaning_actions": ["执行的清洗操作1", "执行的清洗操作2"],
"entities_normalized": {{"原始实体": "标准化后的实体"}},
"data_quality_improvement": 0.0-1.0
}}"""),
("human", """原始文本:
{original_text}
质量问题报告:
{quality_issues}
清洗建议:
{cleaning_suggestions}
请对该文本进行清洗。""")
])
def deep_cleaning_node(state: KGBuildState) -> dict:
"""
深度清洗节点:针对质量不达标的文档,
利用 LLM 进行语义感知的个性化深度清洗。
"""
print("🧹 [深度清洗节点] 开始对低质量文档进行深度清洗...")
documents = state.get("cleaned_documents", [])
quality_threshold = state.get("quality_threshold", 0.7)
parser = JsonOutputParser()
chain = deep_cleaning_prompt | llm | parser
cleaned_count = 0
for doc in documents:
quality_score = doc.get("quality_score", 1.0)
# 只对质量不达标的文档进行深度清洗
if quality_score < quality_threshold or doc.get(
"quality_report", {}
).get("needs_deep_cleaning", False):
print(f" 🔧 清洗文档 {doc['id'][:20]}... (原始得分: {quality_score:.2f})")
try:
quality_report = doc.get("quality_report", {})
result = chain.invoke({
"original_text": doc["content"],
"quality_issues": json.dumps(
quality_report.get("issues", []),
ensure_ascii=False
),
"cleaning_suggestions": json.dumps(
quality_report.get("suggestions", []),
ensure_ascii=False
)
})
# 更新文档内容
original_content = doc["content"]
doc["content"] = result.get("cleaned_text", doc["content"])
doc["cleaning_history"] = {
"original_content": original_content,
"cleaning_actions": result.get("cleaning_actions", []),
"entities_normalized": result.get("entities_normalized", {}),
"quality_improvement": result.get("data_quality_improvement", 0.0),
"cleaning_timestamp": datetime.now().isoformat()
}
doc["processing_status"] = "deep_cleaned"
improvement = result.get("data_quality_improvement", 0.0)
print(f" ✅ 清洗完成,质量提升: +{improvement:.2f}")
for action in result.get("cleaning_actions", [])[:3]:
print(f" 🔸 {action}")
cleaned_count += 1
except Exception as e:
error_msg = f"文档 {doc.get('id', 'unknown')} 深度清洗失败: {str(e)}"
print(f" ❌ {error_msg}")
doc["processing_status"] = "cleaning_failed"
print(f" ✅ 共深度清洗 {cleaned_count} 个文档")
return {
"cleaned_documents": documents,
"iteration_count": state.get("iteration_count", 0) + 1
}
3.6 实体抽取节点
entity_extraction_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的实体识别专家,专注于科技投融资领域的知识图谱构建。
请从文本中识别并提取以下类型的实体:
- COMPANY:公司/企业(包括投资方和被投资方)
- PERSON:人物(创始人、CEO、投资人等)
- INVESTMENT_ROUND:融资轮次(天使轮、A轮、B轮等)
- AMOUNT:融资金额
- DATE:日期/时间
- LOCATION:地点/地区
- TECHNOLOGY:技术领域/产品
返回JSON格式:
{{
"entities": [
{{
"name": "实体名称",
"type": "实体类型",
"attributes": {{"key": "value"}},
"confidence": 0.0-1.0,
"mention_span": "在文中的原始提及"
}}
]
}}"""),
("human", "请从以下文本中提取实体:\n\n{text}")
])
def entity_extraction_node(state: KGBuildState) -> dict:
"""
实体抽取节点:从清洗后的文档中识别并结构化提取实体。
"""
print("🏷️ [实体抽取节点] 开始提取实体...")
documents = state.get("cleaned_documents", [])
all_entities = []
parser = JsonOutputParser()
chain = entity_extraction_prompt | llm | parser
for doc in documents:
if doc.get("processing_status") == "cleaning_failed":
continue
try:
result = chain.invoke({"text": doc["content"]})
for entity_data in result.get("entities", []):
entity = Entity(
id=f"entity_{len(all_entities)}_{entity_data['name'][:10]}",
name=entity_data["name"],
type=entity_data["type"],
attributes=entity_data.get("attributes", {}),
confidence=entity_data.get("confidence", 0.8),
source_text=doc["id"]
)
all_entities.append(entity)
except Exception as e:
error_msg = f"文档 {doc.get('id', 'unknown')} 实体抽取失败: {str(e)}"
print(f" ❌ {error_msg}")
print(f" ✅ 共抽取 {len(all_entities)} 个实体")
return {"entities": all_entities}
3.7 关系抽取节点
relation_extraction_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的关系抽取专家,专注于科技投融资领域。
请根据文本内容和已识别的实体列表,抽取实体间的语义关系。
支持的关系类型:
- INVESTED_IN:投资关系(投资方 → 被投资公司)
- FOUNDED_BY:创立关系(公司 → 创始人)
- PARTICIPATED_IN:参与关系(人物 → 融资事件)
- LOCATED_IN:位置关系(公司 → 地点)
- RAISED:融资关系(公司 → 融资轮次)
- AMOUNT_OF:金额关系(融资轮次 → 金额)
返回JSON格式:
{{
"relations": [
{{
"head_entity": "头实体名称",
"tail_entity": "尾实体名称",
"relation_type": "关系类型",
"attributes": {{"date": "...", "evidence": "..."}},
"confidence": 0.0-1.0
}}
]
}}"""),
("human", """文本内容:
{text}
已识别的实体:
{entities}
请抽取实体间的关系。""")
])
def relation_extraction_node(state: KGBuildState) -> dict:
"""
关系抽取节点:基于已识别的实体和原始文本,抽取实体间的语义关系。
"""
print("🔗 [关系抽取节点] 开始抽取实体关系...")
documents = state.get("cleaned_documents", [])
entities = state.get("entities", [])
all_relations = []
# 构建文档-实体映射
doc_entity_map = {}
for entity in entities:
source = entity.source_text
if source not in doc_entity_map:
doc_entity_map[source] = []
doc_entity_map[source].append(entity)
parser = JsonOutputParser()
chain = relation_extraction_prompt | llm | parser
for doc in documents:
doc_entities = doc_entity_map.get(doc["id"], [])
if not doc_entities:
continue
try:
entities_summary = [
{"name": e.name, "type": e.type}
for e in doc_entities
]
result = chain.invoke({
"text": doc["content"],
"entities": json.dumps(entities_summary, ensure_ascii=False)
})
# 构建实体名称到ID的映射
entity_name_map = {e.name: e.id for e in doc_entities}
for rel_data in result.get("relations", []):
head_id = entity_name_map.get(rel_data["head_entity"])
tail_id = entity_name_map.get(rel_data["tail_entity"])
if head_id and tail_id:
relation = Relation(
head_entity_id=head_id,
tail_entity_id=tail_id,
relation_type=rel_data["relation_type"],
attributes=rel_data.get("attributes", {}),
confidence=rel_data.get("confidence", 0.7),
source_text=doc["id"]
)
all_relations.append(relation)
except Exception as e:
print(f" ❌ 文档 {doc.get('id', 'unknown')} 关系抽取失败: {str(e)}")
print(f" ✅ 共抽取 {len(all_relations)} 条关系")
return {"relations": all_relations}
3.8 实体消歧节点
实体消歧是知识图谱构建中的难题,也是数据清洗的重要组成部分。这里结合向量相似度与 LLM 判断,实现高精度的实体对齐。
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
entity_disambiguation_prompt = ChatPromptTemplate.from_messages([
("system", """你是实体消歧专家。请判断以下两个实体是否指代同一真实世界对象。
判断标准:
- 考虑名称的相似性(包括缩写、别名、英文名等)
- 考虑类型是否一致
- 考虑属性信息是否相互印证
返回JSON:
{{
"are_same": true/false,
"confidence": 0.0-1.0,
"canonical_name": "标准化名称(如果是同一实体)",
"reasoning": "判断理由"
}}"""),
("human", """实体A:
名称:{entity_a_name}
类型:{entity_a_type}
属性:{entity_a_attrs}
实体B:
名称:{entity_b_name}
类型:{entity_b_type}
属性:{entity_b_attrs}
请判断这两个实体是否指代同一对象。""")
])
def entity_disambiguation_node(state: KGBuildState) -> dict:
"""
实体消歧节点:通过向量相似度初筛 + LLM 精确判断,
解决知识图谱构建中的实体对齐问题。
"""
print("🔄 [实体消歧节点] 开始处理实体消歧...")
entities = state.get("entities", [])
if len(entities) < 2:
return {
"canonical_entities": entities,
"merged_relations": state.get("relations", [])
}
# Step 1: 向量化所有实体名称
entity_names = [e.name for e in entities]
embeddings = embedding_model.encode(entity_names)
similarity_matrix = cosine_similarity(embeddings)
# Step 2: 找出相似度高的候选对(阈值 0.8)
candidate_pairs = []
for i in range(len(entities)):
for j in range(i + 1, len(entities)):
if (similarity_matrix[i][j] > 0.8 and
entities[i].type == entities[j].type):
candidate_pairs.append((i, j, similarity_matrix[i][j]))
print(f" 📊 发现 {len(candidate_pairs)} 对候选相似实体")
# Step 3: 对候选对使用 LLM 进行精确判断
parser = JsonOutputParser()
chain = entity_disambiguation_prompt | llm | parser
merge_map = {} # {entity_id: canonical_entity_id}
for i, j, sim_score in candidate_pairs:
entity_a = entities[i]
entity_b = entities[j]
try:
result = chain.invoke({
"entity_a_name": entity_a.name,
"entity_a_type": entity_a.type,
"entity_a_attrs": json.dumps(entity_a.attributes, ensure_ascii=False),
"entity_b_name": entity_b.name,
"entity_b_type": entity_b.type,
"entity_b_attrs": json.dumps(entity_b.attributes, ensure_ascii=False),
})
if result.get("are_same") and result.get("confidence", 0) > 0.75:
canonical_name = result.get("canonical_name", entity_a.name)
print(f" 🔀 合并: '{entity_a.name}' + '{entity_b.name}' → '{canonical_name}'")
# 保留 entity_a,将 entity_b 映射到 entity_a
entity_a.name = canonical_name
merge_map[entity_b.id] = entity_a.id
except Exception as e:
print(f" ⚠️ 实体对 ({entity_a.name}, {entity_b.name}) 消歧失败: {e}")
# Step 4: 去重并更新关系中的实体引用
canonical_entity_ids = set(e.id for e in entities) - set(merge_map.keys())
canonical_entities = [e for e in entities if e.id in canonical_entity_ids]
# 更新关系
relations = state.get("relations", [])
merged_relations = []
for rel in relations:
new_rel = Relation(
head_entity_id=merge_map.get(rel.head_entity_id, rel.head_entity_id),
tail_entity_id=merge_map.get(rel.tail_entity_id, rel.tail_entity_id),
relation_type=rel.relation_type,
attributes=rel.attributes,
confidence=rel.confidence,
source_text=rel.source_text
)
merged_relations.append(new_rel)
# 去除自环关系
merged_relations = [
r for r in merged_relations
if r.head_entity_id != r.tail_entity_id
]
print(f" ✅ 消歧完成:{len(entities)} 个实体 → {len(canonical_entities)} 个规范实体")
print(f" ✅ 关系数量:{len(relations)} → {len(merged_relations)}(去除自环)")
return {
"canonical_entities": canonical_entities,
"merged_relations": merged_relations
}
3.9 图谱写入与验证节点
from neo4j import GraphDatabase
class Neo4jWriter:
def __init__(self, uri: str, user: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def write_entities(self, entities: List[Entity]) -> List[dict]:
results = []
with self.driver.session() as session:
for entity in entities:
try:
query = """
MERGE (e:Entity {id: $id})
SET e.name = $name,
e.type = $type,
e.confidence = $confidence,
e.updated_at = datetime()
WITH e
CALL apoc.create.addLabels(e, [$type]) YIELD node
RETURN node.id as id
"""
result = session.run(query,
id=entity.id,
name=entity.name,
type=entity.type,
confidence=entity.confidence
)
results.append({"entity_id": entity.id, "status": "success"})
except Exception as e:
results.append({"entity_id": entity.id, "status": "failed", "error": str(e)})
return results
def write_relations(self, relations: List[Relation]) -> List[dict]:
results = []
with self.driver.session() as session:
for rel in relations:
try:
query = f"""
MATCH (h:Entity {{id: $head_id}})
MATCH (t:Entity {{id: $tail_id}})
MERGE (h)-[r:{rel.relation_type}]->(t)
SET r.confidence = $confidence,
r.updated_at = datetime()
RETURN type(r) as rel_type
"""
session.run(query,
head_id=rel.head_entity_id,
tail_id=rel.tail_entity_id,
confidence=rel.confidence
)
results.append({"relation": f"{rel.head_entity_id}->{rel.tail_entity_id}", "status": "success"})
except Exception as e:
results.append({"relation": f"{rel.head_entity_id}->{rel.tail_entity_id}", "status": "failed", "error": str(e)})
return results
def kg_write_and_validate_node(state: KGBuildState) -> dict:
"""
图谱写入与验证节点:将实体和关系写入图数据库,
并进行完整性校验。
"""
print("💾 [图谱写入节点] 开始写入知识图谱...")
entities = state.get("canonical_entities", [])
relations = state.get("merged_relations", [])
# 初始化 Neo4j 写入器(实际使用时从配置读取)
writer = Neo4jWriter(
uri="bolt://localhost:7687",
user="neo4j",
password="password"
)
# 写入实体
entity_results = writer.write_entities(entities)
success_entities = sum(1 for r in entity_results if r["status"] == "success")
# 写入关系
relation_results = writer.write_relations(relations)
success_relations = sum(1 for r in relation_results if r["status"] == "success")
print(f" ✅ 实体写入: {success_entities}/{len(entities)} 成功")
print(f" ✅ 关系写入: {success_relations}/{len(relations)} 成功")
# 计算写入成功率
total_items = len(entities) + len(relations)
success_items = success_entities + success_relations
write_success_rate = success_items / total_items if total_items > 0 else 0.0
kg_write_results = {
"entity_results": entity_results,
"relation_results": relation_results,
"write_success_rate": write_success_rate,
"timestamp": datetime.now().isoformat()
}
return {
"kg_write_results": [kg_write_results],
"overall_quality_score": write_success_rate
}
四、图的组装与条件路由
4.1 定义条件路由函数
def quality_routing(state: KGBuildState) -> str:
"""
质量路由:根据数据质量评估结果决定是否需要深度清洗。
"""
quality_score = state.get("overall_quality_score", 0.0)
iteration_count = state.get("iteration_count", 0)
max_iterations = state.get("max_iterations", 3)
quality_threshold = state.get("quality_threshold", 0.7)
# 防止无限循环
if iteration_count >= max_iterations:
print(f" ⚠️ 已达最大迭代次数 ({max_iterations}),强制进入下一步")
return "proceed_to_extraction"
if quality_score < quality_threshold:
print(f" 🔁 质量得分 ({quality_score:.2f}) 低于阈值 ({quality_threshold}),触发深度清洗")
return "needs_deep_cleaning"
else:
print(f" ✅ 质量得分 ({quality_score:.2f}) 达标,继续实体抽取")
return "proceed_to_extraction"
def write_validation_routing(state: KGBuildState) -> str:
"""
写入验证路由:根据写入成功率决定是否需要回滚修复。
"""
write_results = state.get("kg_write_results", [])
if not write_results:
return "end"
latest_result = write_results[-1]
success_rate = latest_result.get("write_success_rate", 0.0)
if success_rate < 0.9:
print(f" ⚠️ 写入成功率 ({success_rate:.2%}) 不足,触发回滚修复")
return "needs_repair"
print(f" ✅ 写入成功率 ({success_rate:.2%}) 达标,任务完成")
return "end"
def rollback_repair_node(state: KGBuildState) -> dict:
"""回滚修复节点:处理写入失败的数据"""
print("🔧 [回滚修复节点] 开始处理写入失败的数据...")
# 实现具体的修复逻辑...
return {"error_log": ["部分数据写入失败,已记录待人工审核"]}
4.2 构建完整的 LangGraph 图
def build_kg_agent() -> StateGraph:
"""
构建完整的知识图谱构建智能体图。
"""
# 初始化图
workflow = StateGraph(KGBuildState)
# 添加所有节点
workflow.add_node("data_ingestion", data_ingestion_node)
workflow.add_node("quality_assessment", data_quality_assessment_node)
workflow.add_node("deep_cleaning", deep_cleaning_node)
workflow.add_node("entity_extraction", entity_extraction_node)
workflow.add_node("relation_extraction", relation_extraction_node)
workflow.add_node("entity_disambiguation", entity_disambiguation_node)
workflow.add_node("kg_write_validate", kg_write_and_validate_node)
workflow.add_node("rollback_repair", rollback_repair_node)
# 设置入口节点
workflow.set_entry_point("data_ingestion")
# 添加固定边(确定性流转)
workflow.add_edge("data_ingestion", "quality_assessment")
workflow.add_edge("deep_cleaning", "quality_assessment") # 清洗后重新评估
workflow.add_edge("entity_extraction", "relation_extraction")
workflow.add_edge("relation_extraction", "entity_disambiguation")
workflow.add_edge("entity_disambiguation", "kg_write_validate")
workflow.add_edge("rollback_repair", END)
# 添加条件边(动态路由)
workflow.add_conditional_edges(
"quality_assessment",
quality_routing,
{
"needs_deep_cleaning": "deep_cleaning",
"proceed_to_extraction": "entity_extraction"
}
)
workflow.add_conditional_edges(
"kg_write_validate",
write_validation_routing,
{
"needs_repair": "rollback_repair",
"end": END
}
)
# 编译图
app = workflow.compile()
return app
4.3 运行智能体
def run_kg_builder(documents: List[dict]) -> dict:
"""
运行知识图谱构建智能体。
"""
print("🚀 启动知识图谱构建智能体...")
print("=" * 60)
# 构建智能体
app = build_kg_agent()
# 初始化状态
initial_state = {
"raw_documents": documents,
"cleaned_documents": [],
"quality_reports": [],
"entities": [],
"relations": [],
"canonical_entities": [],
"merged_relations": [],
"current_batch_index": 0,
"iteration_count": 0,
"max_iterations": 3,
"overall_quality_score": 0.0,
"kg_write_results": [],
"error_log": [],
"quality_threshold": 0.75
}
# 执行智能体(流式输出每个步骤的结果)
final_state = None
for step_output in app.stream(initial_state):
node_name = list(step_output.keys())[0]
print(f"\n{'='*60}")
print(f"✨ 节点 [{node_name}] 执行完成")
final_state = step_output[node_name]
print("\n" + "="*60)
print("🎉 知识图谱构建完成!")
if final_state:
entities = final_state.get("canonical_entities", [])
relations = final_state.get("merged_relations", [])
errors = final_state.get("error_log", [])
print(f"\n📊 最终统计:")
print(f" • 规范实体数量:{len(entities)}")
print(f" • 关系数量:{len(relations)}")
print(f" • 错误日志条数:{len(errors)}")
return final_state
# 示例调用
if __name__ == "__main__":
sample_documents = [
{
"id": "doc_001",
"source": "tech_news",
"content": """
字节跳动旗下TikTok于2023年12月完成新一轮融资,
融资金额约50亿美元,由软银集团领投,腾讯战略跟投。
字节跳动CEO梁汝波表示,此次融资将主要用于AI技术研发。
""",
"format": "text",
"language": "zh"
},
{
"id": "doc_002",
"source": "investment_report",
"content": """
【广告】限时优惠!!
OpenAI完成新融资<br>金额:$10B<br>
时间:2024年1月
投资方包括:微软(Microsoft)、老虎环球基金
CEO:Sam Altman山姆·奥特曼
""",
"format": "html_mixed",
"language": "zh_en_mixed"
}
]
result = run_kg_builder(sample_documents)
五、系统优化与最佳实践
5.1 并行处理优化
对于大规模数据处理场景,可以利用 LangGraph 的并行节点特性,同时对多个文档批次进行处理:
from langgraph.graph import StateGraph
import asyncio
async def parallel_cleaning_node(state: KGBuildState) -> dict:
"""
并行清洗节点:同时处理多个文档,提升吞吐量。
"""
documents = state.get("cleaned_documents", [])
# 将文档分成多个批次
batch_size = 5
batches = [documents[i:i+batch_size] for i in range(0, len(documents), batch_size)]
async def clean_batch(batch):
tasks = []
for doc in batch:
if doc.get("quality_score", 1.0) < 0.75:
tasks.append(asyncio.create_task(
async_clean_single_document(doc)
))
return await asyncio.gather(*tasks, return_exceptions=True)
# 并行处理所有批次
all_tasks = [clean_batch(batch) for batch in batches]
results = await asyncio.gather(*all_tasks)
return {"cleaned_documents": documents}
5.2 检查点(Checkpoint)与断点续传
LangGraph 内置了检查点机制,对于长时间运行的知识图谱构建任务至关重要:
from langgraph.checkpoint.sqlite import SqliteSaver
def build_kg_agent_with_checkpoint():
"""
构建带检查点的知识图谱智能体,支持断点续传。
"""
workflow = StateGraph(KGBuildState)
# ... 添加节点和边 ...
# 使用 SQLite 作为检查点存储
memory = SqliteSaver.from_conn_string("kg_build_checkpoints.db")
app = workflow.compile(checkpointer=memory)
return app
def resume_kg_builder(thread_id: str, documents: List[dict] = None):
"""
续接之前未完成的构建任务。
"""
app = build_kg_agent_with_checkpoint()
config = {"configurable": {"thread_id": thread_id}}
if documents:
# 新任务
initial_state = {"raw_documents": documents, ...}
return app.invoke(initial_state, config=config)
else:
# 续接任务
return app.invoke(None, config=config)
5.3 可观测性与监控
在生产环境中,对智能体的每一步执行进行详细的日志记录和性能监控是必要的:
from langchain.callbacks.base import BaseCallbackHandler
from datetime import datetime
import logging
class KGBuildMonitor(BaseCallbackHandler):
"""
知识图谱构建监控回调。
"""
def __init__(self):
self.metrics = {
"node_execution_times": {},
"llm_call_counts": 0,
"total_tokens_used": 0,
"errors": []
}
self.node_start_times = {}
def on_chain_start(self, serialized, inputs, **kwargs):
node_name = serialized.get("name", "unknown")
self.node_start_times[node_name] = datetime.now()
logging.info(f"[MONITOR] 节点 {node_name} 开始执行")
def on_chain_end(self, outputs, **kwargs):
# 记录执行时间等指标
pass
def on_llm_end(self, response, **kwargs):
self.metrics["llm_call_counts"] += 1
# 累计 token 使用量
if hasattr(response, "llm_output"):
usage = response.llm_output.get("token_usage", {})
self.metrics["total_tokens_used"] += usage.get("total_tokens", 0)
def get_report(self) -> dict:
return {
"total_llm_calls": self.metrics["llm_call_counts"],
"total_tokens": self.metrics["total_tokens_used"],
"node_times": self.metrics["node_execution_times"],
"error_count": len(self.metrics["errors"])
}
5.4 数据清洗质量的持续改进
通过积累清洗历史,可以不断优化清洗策略:
class CleaningHistoryManager:
"""
清洗历史管理器:积累清洗经验,持续优化清洗策略。
"""
def __init__(self, storage_path: str = "cleaning_history.json"):
self.storage_path = storage_path
self.history = self._load_history()
def record_cleaning(self,
original_text: str,
cleaned_text: str,
quality_before: float,
quality_after: float,
cleaning_actions: List[str]):
"""记录一次清洗操作"""
record = {
"timestamp": datetime.now().isoformat(),
"quality_improvement": quality_after - quality_before,
"actions": cleaning_actions,
"text_length": len(original_text),
"effective": quality_after > quality_before + 0.1
}
self.history.append(record)
self._save_history()
def get_effective_patterns(self) -> List[str]:
"""获取历史上效果最好的清洗模式"""
effective_records = [r for r in self.history if r["effective"]]
action_scores = {}
for record in effective_records:
for action in record["actions"]:
if action not in action_scores:
action_scores[action] = 0
action_scores[action] += record["quality_improvement"]
# 返回排名前10的有效清洗操作
sorted_actions = sorted(
action_scores.items(),
key=lambda x: x[1],
reverse=True
)
return [action for action, _ in sorted_actions[:10]]
def generate_cleaning_prompt_enhancement(self) -> str:
"""基于历史数据生成清洗提示词增强内容"""
effective_patterns = self.get_effective_patterns()
if not effective_patterns:
return ""
enhancement = "\n\n根据历史清洗经验,以下操作通常效果最好:\n"
for i, pattern in enumerate(effective_patterns, 1):
enhancement += f"{i}. {pattern}\n"
return enhancement
def _load_history(self) -> List[dict]:
try:
with open(self.storage_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
def _save_history(self):
with open(self.storage_path, 'w', encoding='utf-8') as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
六、与传统方案的对比分析
6.1 实时智能清洗 vs 批量规则清洗
6.2 适用场景建议
推荐使用 LangGraph 实时智能清洗的场景:
多源异构数据,格式差异大
数据中存在大量需要语义理解的实体歧义问题
知识图谱领域较为复杂,规则难以穷举
对数据质量要求高,允许一定的处理延迟
数据规模在百万条以内
仍建议使用传统规则清洗的场景:
数据格式相对统一,规则清洗已足够
数据规模极大(亿级以上),对处理速度要求极高
成本敏感,API 调用费用不可承受
需要完全可控的清洗逻辑
混合策略(推荐):
实践中,最优方案往往是将两者结合——先用规则清洗处理明显的格式问题,再用 LangGraph 智能体处理需要语义理解的复杂清洗任务,从而在效率与质量之间取得平衡。
七、生产环境的注意事项
7.1 错误处理与容错机制
在生产环境中,LLM API 可能出现超时、限流、结果格式异常等问题,必须建立完善的容错机制:
from tenacity import retry, stop_after_attempt, wait_exponential
import functools
def with_fallback(fallback_value):
"""装饰器:为节点函数提供降级处理"""
def decorator(func):
@functools.wraps(func)
def wrapper(state, *args, **kwargs):
try:
return func(state, *args, **kwargs)
except Exception as e:
logging.error(f"节点 {func.__name__} 执行失败: {e}")
return {
"error_log": [f"{func.__name__} 失败: {str(e)}"],
**fallback_value
}
return wrapper
return decorator
@with_fallback({"overall_quality_score": 0.5})
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10))
def robust_quality_assessment_node(state: KGBuildState) -> dict:
"""带重试机制的质量评估节点"""
return data_quality_assessment_node(state)
7.2 Token 成本控制
对于大规模数据处理,Token 成本是重要考量:
class TokenBudgetManager:
"""Token 预算管理器"""
def __init__(self, daily_budget: int = 1_000_000):
self.daily_budget = daily_budget
self.used_today = 0
def can_use_llm(self, estimated_tokens: int) -> bool:
return (self.used_today + estimated_tokens) < self.daily_budget
def get_smart_sampling(self, text: str, max_tokens: int = 500) -> str:
"""智能截断:保留最有价值的部分"""
# 优先保留文本的开头和关键实体密集区域
sentences = text.split('。')
selected = []
current_length = 0
for sentence in sentences:
if current_length + len(sentence) <= max_tokens * 2:
selected.append(sentence)
current_length += len(sentence)
else:
break
return '。'.join(selected)
7.3 知识图谱质量的持续验证
def kg_quality_validator(neo4j_driver) -> dict:
"""
知识图谱质量验证:定期检查图谱的完整性和一致性。
"""
validation_queries = {
"orphan_entities": """
MATCH (e:Entity)
WHERE NOT (e)-[]-()
RETURN count(e) as count
""",
"low_confidence_relations": """
MATCH ()-[r]-()
WHERE r.confidence < 0.5
RETURN count(r) as count
""",
"duplicate_entities": """
MATCH (e:Entity)
WITH e.name as name, count(*) as cnt
WHERE cnt > 1
RETURN count(*) as count
"""
}
results = {}
with neo4j_driver.session() as session:
for check_name, query in validation_queries.items():
result = session.run(query)
results[check_name] = result.single()["count"]
return results
八、总结与展望
8.1 核心价值回顾
本文介绍的基于 LangGraph 的实时智能数据清洗系统,在知识图谱构建领域带来了以下核心价值:
语义感知清洗:突破了传统规则清洗的语义理解瓶颈,能够处理需要上下文理解的复杂数据质量问题。
动态自适应:通过 LangGraph 的条件路由机制,系统能够根据数据质量的实时评估结果,动态调整清洗策略,避免了过度清洗或清洗不足的问题。
流程闭环:从数据摄入、质量评估、深度清洗、实体抽取、关系抽取、实体消歧到图谱写入与验证,形成了一个完整的自动化闭环,大幅降低了人工干预的需求。
可扩展架构:LangGraph 的图结构天然支持节点的增减和替换,便于根据业务需求快速扩展新的处理能力。
8.2 未来发展方向
随着技术的持续演进,基于 LangGraph 的知识图谱构建系统还有以下值得探索的方向:
多模态数据支持:将图片、表格、PDF 等非结构化数据纳入处理范围,结合视觉语言模型(VLM)实现多模态实体和关系的抽取与清洗。
主动学习集成:将人工标注的高质量数据反馈到系统中,通过少样本学习(Few-shot Learning)持续提升清洗和抽取的精度。
分布式智能体协作:在超大规模数据场景下,探索多个 LangGraph 智能体协同工作的架构,实现真正意义上的分布式知识图谱实时构建。
图谱推理增强清洗:利用已构建的知识图谱本身进行反向验证,通过图谱中的已有知识来辅助判断新数据的质量与正确性,实现"以图养图"的良性循环。
知识图谱与大语言模型的深度融合,正在重塑企业知识管理与智能决策的基础设施。LangGraph 作为这一融合过程中的重要工程工具,将在未来的智能化建设中扮演越来越重要的角色。希望本文的探讨能为从事相关领域工作的工程师和研究者提供有价值的参考与启发。
本文代码示例基于 LangGraph 0.2.x 版本,部分 API 在未来版本中可能有所变动,请以官方文档为准。完整项目代码可在 GitHub 仓库中获取。