RAG 实战搭建:基于 Milvus 的完整知识库问答系统
🏗️ 整体架构
┌─────────────────────────────────────────────────────────┐
│ RAG 系统架构 │
│ │
│ 📄 文档输入 │
│ ↓ │
│ ✂️ 文本分块 (Chunking) │
│ ↓ │
│ 🧠 Embedding 模型 → 向量化 │
│ ↓ │
│ 🗄️ Milvus 向量存储 │
│ │
│ ❓ 用户提问 │
│ ↓ │
│ 🧠 问题向量化 │
│ ↓ │
│ 🔍 Milvus 相似度检索 │
│ ↓ │
│ 📋 召回相关文档片段 │
│ ↓ │
│ 🤖 LLM 生成回答(含上下文) │
│ ↓ │
│ 💬 返回最终答案 │
└─────────────────────────────────────────────────────────┘
🛠️ 环境准备
安装依赖
pip install pymilvus # Milvus 客户端
pip install openai # LLM 调用
pip install sentence-transformers # Embedding 模型
pip install langchain # 可选,RAG 框架
pip install pypdf # PDF 解析
pip install python-docx # Word 文档解析
启动 Milvus(Docker)
# 下载启动脚本
wget https://github.com/milvus-io/milvus/releases/download/v2.4.0/milvus-standalone-docker-compose.yml \
-O docker-compose.yml
# 启动服务
docker-compose up -d
# 验证运行状态
docker-compose ps
📁 项目结构
rag_project/
├── config.py # 配置文件
├── document_loader.py # 文档加载与分块
├── embedder.py # 向量化模块
├── vector_store.py # Milvus 操作封装
├── retriever.py # 检索模块
├── generator.py # LLM 生成模块
├── rag_pipeline.py # RAG 主流程
└── main.py # 入口文件
📝 Step 1:配置文件
# config.py
class Config:
# Milvus 配置
MILVUS_HOST = "localhost"
MILVUS_PORT = 19530
COLLECTION_NAME = "knowledge_base"
VECTOR_DIM = 768 # 向量维度,与 Embedding 模型对应
# 文本分块配置
CHUNK_SIZE = 500 # 每块字符数
CHUNK_OVERLAP = 50 # 块间重叠字符数
# 检索配置
TOP_K = 5 # 召回 Top K 个片段
SEARCH_METRIC = "COSINE" # 相似度度量方式
# LLM 配置
OPENAI_API_KEY = "your-api-key"
LLM_MODEL = "gpt-4o"
MAX_TOKENS = 1024
TEMPERATURE = 0.1 # 低温度,保证回答稳定
# Embedding 模型
EMBED_MODEL = "BAAI/bge-base-zh-v1.5" # 中文效果好
📄 Step 2:文档加载与分块
# document_loader.py
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List
import pypdf
from docx import Document
@dataclass
class TextChunk:
"""文本块数据结构"""
content: str # 文本内容
source: str # 来源文件
chunk_id: int # 块编号
metadata: dict # 附加元数据
class DocumentLoader:
def __init__(self, chunk_size=500, chunk_overlap=50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# ── 加载不同格式文档 ──────────────────────────────
def load_pdf(self, file_path: str) -> str:
"""加载 PDF 文件"""
text = ""
with open(file_path, "rb") as f:
reader = pypdf.PdfReader(f)
for page in reader.pages:
text += page.extract_text() + "\n"
return text
def load_docx(self, file_path: str) -> str:
"""加载 Word 文档"""
doc = Document(file_path)
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
def load_txt(self, file_path: str) -> str:
"""加载纯文本"""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
def load_file(self, file_path: str) -> str:
"""自动识别文件类型并加载"""
suffix = Path(file_path).suffix.lower()
loaders = {
".pdf": self.load_pdf,
".docx": self.load_docx,
".txt": self.load_txt,
".md": self.load_txt,
}
loader = loaders.get(suffix)
if not loader:
raise ValueError(f"不支持的文件格式:{suffix}")
return loader(file_path)
# ── 文本清洗 ──────────────────────────────────────
def clean_text(self, text: str) -> str:
"""清洗文本:去除多余空白、特殊字符"""
text = re.sub(r'\n{3,}', '\n\n', text) # 多个空行合并
text = re.sub(r' {2,}', ' ', text) # 多个空格合并
text = text.strip()
return text
# ── 文本分块 ──────────────────────────────────────
def split_text(self, text: str, source: str) -> List[TextChunk]:
"""
滑动窗口分块策略
保留块间重叠,避免上下文信息丢失
"""
text = self.clean_text(text)
chunks = []
start = 0
chunk_id = 0
while start < len(text):
end = start + self.chunk_size
# 尝试在句子边界处切分(避免截断句子)
if end < len(text):
for sep in ['。', '!', '?', '\n', ';', '.', '!', '?']:
boundary = text.rfind(sep, start, end)
if boundary != -1:
end = boundary + 1
break
chunk_content = text[start:end].strip()
if chunk_content:
chunks.append(TextChunk(
content=chunk_content,
source=source,
chunk_id=chunk_id,
metadata={"file": source, "position": start}
))
chunk_id += 1
# 下一块起始位置(含重叠)
start = end - self.chunk_overlap
return chunks
def process_file(self, file_path: str) -> List[TextChunk]:
"""完整处理一个文件:加载 → 清洗 → 分块"""
raw_text = self.load_file(file_path)
return self.split_text(raw_text, source=file_path)
def process_directory(self, dir_path: str) -> List[TextChunk]:
"""批量处理目录下所有文档"""
all_chunks = []
for file in Path(dir_path).rglob("*"):
if file.suffix.lower() in [".pdf", ".docx", ".txt", ".md"]:
print(f"正在处理:{file.name}")
try:
chunks = self.process_file(str(file))
all_chunks.extend(chunks)
print(f" → 生成 {len(chunks)} 个文本块")
except Exception as e:
print(f" ✗ 处理失败:{e}")
return all_chunks
🧠 Step 3:Embedding 向量化
# embedder.py
from sentence_transformers import SentenceTransformer
from typing import List
import numpy as np
class Embedder:
def __init__(self, model_name: str = "BAAI/bge-base-zh-v1.5"):
"""
推荐中文模型:
- BAAI/bge-base-zh-v1.5 → 均衡选择(768维)
- BAAI/bge-large-zh-v1.5 → 效果更好(1024维)
- shibing624/text2vec-base-chinese → 轻量备选
"""
print(f"加载 Embedding 模型:{model_name}")
self.model = SentenceTransformer(model_name)
self.dimension = self.model.get_sentence_embedding_dimension()
print(f"模型加载完成,向量维度:{self.dimension}")
def embed_text(self, text: str) -> List[float]:
"""单条文本向量化"""
# BGE 模型建议加前缀提升检索效果
prefixed = f"为这个句子生成表示以用于检索相关文章:{text}"
vector = self.model.encode(prefixed, normalize_embeddings=True)
return vector.tolist()
def embed_query(self, query: str) -> List[float]:
"""查询向量化(与文档向量化保持一致)"""
return self.embed_text(query)
def embed_batch(self, texts: List[str], batch_size: int = 32) -> List[List[float]]:
"""批量向量化(提升效率)"""
prefixed_texts = [
f"为这个句子生成表示以用于检索相关文章:{t}" for t in texts
]
vectors = self.model.encode(
prefixed_texts,
batch_size=batch_size,
normalize_embeddings=True,
show_progress_bar=True
)
return vectors.tolist()
🗄️ Step 4:Milvus 向量存储
# vector_store.py
from pymilvus import (
MilvusClient,
DataType,
CollectionSchema,
FieldSchema,
)
from typing import List, Dict
from config import Config
from document_loader import TextChunk
class MilvusVectorStore:
def __init__(self):
self.client = MilvusClient(
uri=f"http://{Config.MILVUS_HOST}:{Config.MILVUS_PORT}"
)
self.collection_name = Config.COLLECTION_NAME
self._init_collection()
def _init_collection(self):
"""初始化集合(若不存在则创建)"""
if self.client.has_collection(self.collection_name):
print(f"集合 [{self.collection_name}] 已存在")
return
# 定义 Schema
schema = self.client.create_schema(
auto_id=False,
enable_dynamic_field=True # 允许动态字段
)
# 添加字段
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=Config.VECTOR_DIM)
schema.add_field("content", DataType.VARCHAR, max_length=2000)
schema.add_field("source", DataType.VARCHAR, max_length=500)
schema.add_field("chunk_id",DataType.INT64)
# 创建 HNSW 索引(高性能图索引)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="HNSW",
metric_type=Config.SEARCH_METRIC,
params={"M": 16, "efConstruction": 200}
)
self.client.create_collection(
collection_name=self.collection_name,
schema=schema,
index_params=index_params
)
print(f"✅ 集合 [{self.collection_name}] 创建成功")
def insert_chunks(self, chunks: List[TextChunk], vectors: List[List[float]]):
"""批量插入文本块及其向量"""
data = []
for i, (chunk, vector) in enumerate(zip(chunks, vectors)):
data.append({
"id": i,
"vector": vector,
"content": chunk.content[:2000], # 截断防超长
"source": chunk.source,
"chunk_id": chunk.chunk_id,
})
# 分批插入,避免单次请求过大
batch_size = 100
total = 0
for i in range(0, len(data), batch_size):
batch = data[i: i + batch_size]
self.client.insert(self.collection_name, batch)
total += len(batch)
print(f" 已插入 {total}/{len(data)} 条")
print(f"✅ 共插入 {len(data)} 条向量数据")
def search(self, query_vector: List[float], top_k: int = 5) -> List[Dict]:
"""向量相似度搜索"""
results = self.client.search(
collection_name=self.collection_name,
data=[query_vector],
limit=top_k,
output_fields=["content", "source", "chunk_id"],
search_params={"ef": 100} # HNSW 搜索参数,越大越精确
)
hits = []
for hit in results[0]:
hits.append({
"content": hit["entity"]["content"],
"source": hit["entity"]["source"],
"chunk_id": hit["entity"]["chunk_id"],
"score": hit["distance"], # 相似度分数
})
return hits
def get_stats(self) -> Dict:
"""获取集合统计信息"""
stats = self.client.get_collection_stats(self.collection_name)
return stats
🔍 Step 5:检索模块
# retriever.py
from typing import List, Dict
from embedder import Embedder
from vector_store import MilvusVectorStore
from config import Config
class Retriever:
def __init__(self, embedder: Embedder, vector_store: MilvusVectorStore):
self.embedder = embedder
self.vector_store = vector_store
def retrieve(self, query: str, top_k: int = None) -> List[Dict]:
"""
检索与查询最相关的文档片段
返回按相似度排序的结果列表
"""
top_k = top_k or Config.TOP_K
# 1. 查询向量化
query_vector = self.embedder.embed_query(query)
# 2. Milvus 相似度搜索
results = self.vector_store.search(query_vector, top_k=top_k)
# 3. 过滤低质量结果(余弦相似度 < 0.5 则丢弃)
filtered = [r for r in results if r["score"] >= 0.5]
if not filtered:
print("⚠️ 未找到足够相关的文档片段")
return filtered
def format_context(self, retrieved_docs: List[Dict]) -> str:
"""将检索结果格式化为 LLM 上下文字符串"""
if not retrieved_docs:
return "(未找到相关文档)"
context_parts = []
for i, doc in enumerate(retrieved_docs, 1):
context_parts.append(
f"【参考片段 {i}】来源:{doc['source']}\n{doc['content']}"
)
return "\n\n".join(context_parts)
🤖 Step 6:LLM 生成模块
# generator.py
from openai import OpenAI
from typing import List, Dict
from config import Config
class Generator:
def __init__(self):
self.client = OpenAI(api_key=Config.OPENAI_API_KEY)
def build_prompt(self, query: str, context: str) -> List[Dict]:
"""构建 RAG Prompt"""
system_prompt = """你是一个专业的知识库问答助手。
请严格根据以下提供的参考文档来回答用户问题。
回答要求:
1. 只使用参考文档中的信息,不要编造内容
2. 如果文档中没有相关信息,请明确告知用户
3. 回答要简洁、准确、有条理
4. 可以适当引用原文,并注明来源"""
user_prompt = f"""参考文档:
{context}
用户问题:{query}
请根据参考文档回答上述问题。"""
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
def generate(self, query: str, context: str) -> str:
"""调用 LLM 生成回答"""
messages = self.build_prompt(query, context)
response = self.client.chat.completions.create(
model=Config.LLM_MODEL,
messages=messages,
max_tokens=Config.MAX_TOKENS,
temperature=Config.TEMPERATURE,
)
return response.choices[0].message.content
def generate_with_stream(self, query: str, context: str):
"""流式生成(逐字输出,提升用户体验)"""
messages = self.build_prompt(query, context)
stream = self.client.chat.completions.create(
model=Config.LLM_MODEL,
messages=messages,
max_tokens=Config.MAX_TOKENS,
temperature=Config.TEMPERATURE,
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
🔄 Step 7:RAG 主流程
# rag_pipeline.py
from document_loader import DocumentLoader
from embedder import Embedder
from vector_store import MilvusVectorStore
from retriever import Retriever
from generator import Generator
from config import Config
class RAGPipeline:
def __init__(self):
print("🚀 初始化 RAG Pipeline...")
self.loader = DocumentLoader(Config.CHUNK_SIZE, Config.CHUNK_OVERLAP)
self.embedder = Embedder(Config.EMBED_MODEL)
self.vector_store = MilvusVectorStore()
self.retriever = Retriever(self.embedder, self.vector_store)
self.generator = Generator()
print("✅ RAG Pipeline 初始化完成\n")
# ── 知识库构建 ─────────────────────────────────────
def build_knowledge_base(self, source_path: str):
"""
构建知识库
source_path: 文件路径 或 目录路径
"""
import os
print(f"📂 开始构建知识库:{source_path}")
# 1. 加载并分块
if os.path.isdir(source_path):
chunks = self.loader.process_directory(source_path)
else:
chunks = self.loader.process_file(source_path)
print(f"📄 共生成 {len(chunks)} 个文本块")
# 2. 批量向量化
print("🧠 开始向量化...")
texts = [chunk.content for chunk in chunks]
vectors = self.embedder.embed_batch(texts)
# 3. 存入 Milvus
print("🗄️ 存入 Milvus...")
self.vector_store.insert_chunks(chunks, vectors)
print(f"\n🎉 知识库构建完成!共收录 {len(chunks)} 个知识片段")
# ── 问答查询 ───────────────────────────────────────
def query(self, question: str, stream: bool = False) -> str:
"""
RAG 问答主流程
1. 检索相关文档
2. 构建上下文
3. LLM 生成回答
"""
print(f"\n❓ 用户问题:{question}")
print("-" * 50)
# Step 1: 检索
retrieved_docs = self.retriever.retrieve(question)
if not retrieved_docs:
return "抱歉,知识库中未找到与您问题相关的内容,请尝试换个方式提问。"
print(f"🔍 召回 {len(retrieved_docs)} 个相关片段:")
for i, doc in enumerate(retrieved_docs, 1):
print(f" [{i}] 相似度:{doc['score']:.4f} | {doc['source']}")
# Step 2: 格式化上下文
context = self.retriever.format_context(retrieved_docs)
# Step 3: 生成回答
print("\n🤖 生成回答...")
if stream:
# 流式输出
full_answer = ""
for token in self.generator.generate_with_stream(question, context):
print(token, end="", flush=True)
full_answer += token
print()
return full_answer
else:
answer = self.generator.generate(question, context)
print(f"\n💬 回答:{answer}")
return answer
def get_stats(self):
"""查看知识库状态"""
stats = self.vector_store.get_stats()
print(f"📊 知识库统计:{stats}")
🚀 Step 8:运行入口
# main.py
from rag_pipeline import RAGPipeline
def main():
# 初始化 RAG 系统
rag = RAGPipeline()
# ── 构建知识库 ──────────────────────────────
# 支持单个文件或整个目录
rag.build_knowledge_base("./documents")
# ── 查看统计 ────────────────────────────────
rag.get_stats()
# ── 交互式问答 ──────────────────────────────
print("\n" + "="*60)
print("💬 知识库问答系统已就绪,输入 'quit' 退出")
print("="*60)
while True:
question = input("\n请输入您的问题:").strip()
if question.lower() in ["quit", "exit", "退出"]:
print("再见!👋")
break
if not question:
continue
rag.query(question, stream=True)
if __name__ == "__main__":
main()
📊 运行效果示例
🚀 初始化 RAG Pipeline...
加载 Embedding 模型:BAAI/bge-base-zh-v1.5
模型加载完成,向量维度:768
✅ RAG Pipeline 初始化完成
📂 开始构建知识库:./documents
正在处理:产品手册.pdf
→ 生成 128 个文本块
正在处理:技术文档.md
→ 生成 56 个文本块
📄 共生成 184 个文本块
🧠 开始向量化...
已插入 100/184 条
已插入 184/184 条
🎉 知识库构建完成!共收录 184 个知识片段
============================================================
💬 知识库问答系统已就绪
============================================================
请输入您的问题:产品的退款政策是什么?
❓ 用户问题:产品的退款政策是什么?
--------------------------------------------------
🔍 召回 3 个相关片段:
[1] 相似度:0.9231 | ./documents/产品手册.pdf
[2] 相似度:0.8876 | ./documents/产品手册.pdf
[3] 相似度:0.7654 | ./documents/技术文档.md
🤖 生成回答...
根据产品手册,退款政策如下:
1. 购买后 7 天内可申请无理由退款...
2. 产品存在质量问题可在 30 天内申请退换货...
🔧 进阶优化方向
┌─────────────────────────────────────────────────────┐
│ 优化方向 │
├──────────────┬──────────────────────────────────────┤
│ 检索优化 │ 混合检索(向量 + BM25关键词) │
│ │ 重排序(Reranker)模型二次排序 │
│ │ HyDE 假设文档嵌入 │
├──────────────┼──────────────────────────────────────┤
│ 分块优化 │ 语义分块(按段落/章节) │
│ │ 父子块策略(小块检索,大块生成) │
├──────────────┼──────────────────────────────────────┤
│ 生成优化 │ 引用溯源(标注答案来源) │
│ │ 多轮对话历史管理 │
├──────────────┼──────────────────────────────────────┤
│ 工程优化 │ 异步并发处理 │
│ │ 向量缓存(Redis) │
│ │ 增量更新知识库 │
└──────────────┴──────────────────────────────────────┘
需要深入某个模块吗?比如 混合检索实现、Reranker 接入、LangChain 版本重构,或是 多轮对话管理?