🏗️ 整体架构

┌─────────────────────────────────────────────────────────┐
│                    RAG 系统架构                           │
│                                                         │
│  📄 文档输入                                             │
│     ↓                                                   │
│  ✂️  文本分块 (Chunking)                                 │
│     ↓                                                   │
│  🧠 Embedding 模型 → 向量化                              │
│     ↓                                                   │
│  🗄️  Milvus 向量存储                                     │
│                                                         │
│  ❓ 用户提问                                             │
│     ↓                                                   │
│  🧠 问题向量化                                           │
│     ↓                                                   │
│  🔍 Milvus 相似度检索                                    │
│     ↓                                                   │
│  📋 召回相关文档片段                                      │
│     ↓                                                   │
│  🤖 LLM 生成回答(含上下文)                              │
│     ↓                                                   │
│  💬 返回最终答案                                          │
└─────────────────────────────────────────────────────────┘

🛠️ 环境准备

安装依赖

pip install pymilvus          # Milvus 客户端
pip install openai            # LLM 调用
pip install sentence-transformers  # Embedding 模型
pip install langchain         # 可选,RAG 框架
pip install pypdf             # PDF 解析
pip install python-docx       # Word 文档解析

启动 Milvus(Docker)

# 下载启动脚本
wget https://github.com/milvus-io/milvus/releases/download/v2.4.0/milvus-standalone-docker-compose.yml \
     -O docker-compose.yml

# 启动服务
docker-compose up -d

# 验证运行状态
docker-compose ps

📁 项目结构

rag_project/
├── config.py          # 配置文件
├── document_loader.py # 文档加载与分块
├── embedder.py        # 向量化模块
├── vector_store.py    # Milvus 操作封装
├── retriever.py       # 检索模块
├── generator.py       # LLM 生成模块
├── rag_pipeline.py    # RAG 主流程
└── main.py            # 入口文件

📝 Step 1:配置文件

# config.py

class Config:
    # Milvus 配置
    MILVUS_HOST = "localhost"
    MILVUS_PORT = 19530
    COLLECTION_NAME = "knowledge_base"
    VECTOR_DIM = 768           # 向量维度,与 Embedding 模型对应

    # 文本分块配置
    CHUNK_SIZE = 500           # 每块字符数
    CHUNK_OVERLAP = 50         # 块间重叠字符数

    # 检索配置
    TOP_K = 5                  # 召回 Top K 个片段
    SEARCH_METRIC = "COSINE"   # 相似度度量方式

    # LLM 配置
    OPENAI_API_KEY = "your-api-key"
    LLM_MODEL = "gpt-4o"
    MAX_TOKENS = 1024
    TEMPERATURE = 0.1          # 低温度,保证回答稳定

    # Embedding 模型
    EMBED_MODEL = "BAAI/bge-base-zh-v1.5"  # 中文效果好

📄 Step 2:文档加载与分块

# document_loader.py

import re
from pathlib import Path
from dataclasses import dataclass
from typing import List
import pypdf
from docx import Document


@dataclass
class TextChunk:
    """文本块数据结构"""
    content: str          # 文本内容
    source: str           # 来源文件
    chunk_id: int         # 块编号
    metadata: dict        # 附加元数据


class DocumentLoader:

    def __init__(self, chunk_size=500, chunk_overlap=50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    # ── 加载不同格式文档 ──────────────────────────────

    def load_pdf(self, file_path: str) -> str:
        """加载 PDF 文件"""
        text = ""
        with open(file_path, "rb") as f:
            reader = pypdf.PdfReader(f)
            for page in reader.pages:
                text += page.extract_text() + "\n"
        return text

    def load_docx(self, file_path: str) -> str:
        """加载 Word 文档"""
        doc = Document(file_path)
        return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])

    def load_txt(self, file_path: str) -> str:
        """加载纯文本"""
        with open(file_path, "r", encoding="utf-8") as f:
            return f.read()

    def load_file(self, file_path: str) -> str:
        """自动识别文件类型并加载"""
        suffix = Path(file_path).suffix.lower()
        loaders = {
            ".pdf":  self.load_pdf,
            ".docx": self.load_docx,
            ".txt":  self.load_txt,
            ".md":   self.load_txt,
        }
        loader = loaders.get(suffix)
        if not loader:
            raise ValueError(f"不支持的文件格式:{suffix}")
        return loader(file_path)

    # ── 文本清洗 ──────────────────────────────────────

    def clean_text(self, text: str) -> str:
        """清洗文本:去除多余空白、特殊字符"""
        text = re.sub(r'\n{3,}', '\n\n', text)   # 多个空行合并
        text = re.sub(r' {2,}', ' ', text)         # 多个空格合并
        text = text.strip()
        return text

    # ── 文本分块 ──────────────────────────────────────

    def split_text(self, text: str, source: str) -> List[TextChunk]:
        """
        滑动窗口分块策略
        保留块间重叠,避免上下文信息丢失
        """
        text = self.clean_text(text)
        chunks = []
        start = 0
        chunk_id = 0

        while start < len(text):
            end = start + self.chunk_size

            # 尝试在句子边界处切分(避免截断句子)
            if end < len(text):
                for sep in ['。', '!', '?', '\n', ';', '.', '!', '?']:
                    boundary = text.rfind(sep, start, end)
                    if boundary != -1:
                        end = boundary + 1
                        break

            chunk_content = text[start:end].strip()

            if chunk_content:
                chunks.append(TextChunk(
                    content=chunk_content,
                    source=source,
                    chunk_id=chunk_id,
                    metadata={"file": source, "position": start}
                ))
                chunk_id += 1

            # 下一块起始位置(含重叠)
            start = end - self.chunk_overlap

        return chunks

    def process_file(self, file_path: str) -> List[TextChunk]:
        """完整处理一个文件:加载 → 清洗 → 分块"""
        raw_text = self.load_file(file_path)
        return self.split_text(raw_text, source=file_path)

    def process_directory(self, dir_path: str) -> List[TextChunk]:
        """批量处理目录下所有文档"""
        all_chunks = []
        for file in Path(dir_path).rglob("*"):
            if file.suffix.lower() in [".pdf", ".docx", ".txt", ".md"]:
                print(f"正在处理:{file.name}")
                try:
                    chunks = self.process_file(str(file))
                    all_chunks.extend(chunks)
                    print(f"  → 生成 {len(chunks)} 个文本块")
                except Exception as e:
                    print(f"  ✗ 处理失败:{e}")
        return all_chunks

🧠 Step 3:Embedding 向量化

# embedder.py

from sentence_transformers import SentenceTransformer
from typing import List
import numpy as np


class Embedder:

    def __init__(self, model_name: str = "BAAI/bge-base-zh-v1.5"):
        """
        推荐中文模型:
        - BAAI/bge-base-zh-v1.5   → 均衡选择(768维)
        - BAAI/bge-large-zh-v1.5  → 效果更好(1024维)
        - shibing624/text2vec-base-chinese → 轻量备选
        """
        print(f"加载 Embedding 模型:{model_name}")
        self.model = SentenceTransformer(model_name)
        self.dimension = self.model.get_sentence_embedding_dimension()
        print(f"模型加载完成,向量维度:{self.dimension}")

    def embed_text(self, text: str) -> List[float]:
        """单条文本向量化"""
        # BGE 模型建议加前缀提升检索效果
        prefixed = f"为这个句子生成表示以用于检索相关文章:{text}"
        vector = self.model.encode(prefixed, normalize_embeddings=True)
        return vector.tolist()

    def embed_query(self, query: str) -> List[float]:
        """查询向量化(与文档向量化保持一致)"""
        return self.embed_text(query)

    def embed_batch(self, texts: List[str], batch_size: int = 32) -> List[List[float]]:
        """批量向量化(提升效率)"""
        prefixed_texts = [
            f"为这个句子生成表示以用于检索相关文章:{t}" for t in texts
        ]
        vectors = self.model.encode(
            prefixed_texts,
            batch_size=batch_size,
            normalize_embeddings=True,
            show_progress_bar=True
        )
        return vectors.tolist()

🗄️ Step 4:Milvus 向量存储

# vector_store.py

from pymilvus import (
    MilvusClient,
    DataType,
    CollectionSchema,
    FieldSchema,
)
from typing import List, Dict
from config import Config
from document_loader import TextChunk


class MilvusVectorStore:

    def __init__(self):
        self.client = MilvusClient(
            uri=f"http://{Config.MILVUS_HOST}:{Config.MILVUS_PORT}"
        )
        self.collection_name = Config.COLLECTION_NAME
        self._init_collection()

    def _init_collection(self):
        """初始化集合(若不存在则创建)"""
        if self.client.has_collection(self.collection_name):
            print(f"集合 [{self.collection_name}] 已存在")
            return

        # 定义 Schema
        schema = self.client.create_schema(
            auto_id=False,
            enable_dynamic_field=True   # 允许动态字段
        )

        # 添加字段
        schema.add_field("id",      DataType.INT64,     is_primary=True)
        schema.add_field("vector",  DataType.FLOAT_VECTOR, dim=Config.VECTOR_DIM)
        schema.add_field("content", DataType.VARCHAR,   max_length=2000)
        schema.add_field("source",  DataType.VARCHAR,   max_length=500)
        schema.add_field("chunk_id",DataType.INT64)

        # 创建 HNSW 索引(高性能图索引)
        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="vector",
            index_type="HNSW",
            metric_type=Config.SEARCH_METRIC,
            params={"M": 16, "efConstruction": 200}
        )

        self.client.create_collection(
            collection_name=self.collection_name,
            schema=schema,
            index_params=index_params
        )
        print(f"✅ 集合 [{self.collection_name}] 创建成功")

    def insert_chunks(self, chunks: List[TextChunk], vectors: List[List[float]]):
        """批量插入文本块及其向量"""
        data = []
        for i, (chunk, vector) in enumerate(zip(chunks, vectors)):
            data.append({
                "id":       i,
                "vector":   vector,
                "content":  chunk.content[:2000],   # 截断防超长
                "source":   chunk.source,
                "chunk_id": chunk.chunk_id,
            })

        # 分批插入,避免单次请求过大
        batch_size = 100
        total = 0
        for i in range(0, len(data), batch_size):
            batch = data[i: i + batch_size]
            self.client.insert(self.collection_name, batch)
            total += len(batch)
            print(f"  已插入 {total}/{len(data)} 条")

        print(f"✅ 共插入 {len(data)} 条向量数据")

    def search(self, query_vector: List[float], top_k: int = 5) -> List[Dict]:
        """向量相似度搜索"""
        results = self.client.search(
            collection_name=self.collection_name,
            data=[query_vector],
            limit=top_k,
            output_fields=["content", "source", "chunk_id"],
            search_params={"ef": 100}   # HNSW 搜索参数,越大越精确
        )

        hits = []
        for hit in results[0]:
            hits.append({
                "content":  hit["entity"]["content"],
                "source":   hit["entity"]["source"],
                "chunk_id": hit["entity"]["chunk_id"],
                "score":    hit["distance"],           # 相似度分数
            })
        return hits

    def get_stats(self) -> Dict:
        """获取集合统计信息"""
        stats = self.client.get_collection_stats(self.collection_name)
        return stats

🔍 Step 5:检索模块

# retriever.py

from typing import List, Dict
from embedder import Embedder
from vector_store import MilvusVectorStore
from config import Config


class Retriever:

    def __init__(self, embedder: Embedder, vector_store: MilvusVectorStore):
        self.embedder = embedder
        self.vector_store = vector_store

    def retrieve(self, query: str, top_k: int = None) -> List[Dict]:
        """
        检索与查询最相关的文档片段
        返回按相似度排序的结果列表
        """
        top_k = top_k or Config.TOP_K

        # 1. 查询向量化
        query_vector = self.embedder.embed_query(query)

        # 2. Milvus 相似度搜索
        results = self.vector_store.search(query_vector, top_k=top_k)

        # 3. 过滤低质量结果(余弦相似度 < 0.5 则丢弃)
        filtered = [r for r in results if r["score"] >= 0.5]

        if not filtered:
            print("⚠️ 未找到足够相关的文档片段")

        return filtered

    def format_context(self, retrieved_docs: List[Dict]) -> str:
        """将检索结果格式化为 LLM 上下文字符串"""
        if not retrieved_docs:
            return "(未找到相关文档)"

        context_parts = []
        for i, doc in enumerate(retrieved_docs, 1):
            context_parts.append(
                f"【参考片段 {i}】来源:{doc['source']}\n{doc['content']}"
            )
        return "\n\n".join(context_parts)

🤖 Step 6:LLM 生成模块

# generator.py

from openai import OpenAI
from typing import List, Dict
from config import Config


class Generator:

    def __init__(self):
        self.client = OpenAI(api_key=Config.OPENAI_API_KEY)

    def build_prompt(self, query: str, context: str) -> List[Dict]:
        """构建 RAG Prompt"""
        system_prompt = """你是一个专业的知识库问答助手。
请严格根据以下提供的参考文档来回答用户问题。

回答要求:
1. 只使用参考文档中的信息,不要编造内容
2. 如果文档中没有相关信息,请明确告知用户
3. 回答要简洁、准确、有条理
4. 可以适当引用原文,并注明来源"""

        user_prompt = f"""参考文档:
{context}

用户问题:{query}

请根据参考文档回答上述问题。"""

        return [
            {"role": "system", "content": system_prompt},
            {"role": "user",   "content": user_prompt},
        ]

    def generate(self, query: str, context: str) -> str:
        """调用 LLM 生成回答"""
        messages = self.build_prompt(query, context)

        response = self.client.chat.completions.create(
            model=Config.LLM_MODEL,
            messages=messages,
            max_tokens=Config.MAX_TOKENS,
            temperature=Config.TEMPERATURE,
        )

        return response.choices[0].message.content

    def generate_with_stream(self, query: str, context: str):
        """流式生成(逐字输出,提升用户体验)"""
        messages = self.build_prompt(query, context)

        stream = self.client.chat.completions.create(
            model=Config.LLM_MODEL,
            messages=messages,
            max_tokens=Config.MAX_TOKENS,
            temperature=Config.TEMPERATURE,
            stream=True,
        )

        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                yield delta.content

🔄 Step 7:RAG 主流程

# rag_pipeline.py

from document_loader import DocumentLoader
from embedder import Embedder
from vector_store import MilvusVectorStore
from retriever import Retriever
from generator import Generator
from config import Config


class RAGPipeline:

    def __init__(self):
        print("🚀 初始化 RAG Pipeline...")
        self.loader      = DocumentLoader(Config.CHUNK_SIZE, Config.CHUNK_OVERLAP)
        self.embedder    = Embedder(Config.EMBED_MODEL)
        self.vector_store = MilvusVectorStore()
        self.retriever   = Retriever(self.embedder, self.vector_store)
        self.generator   = Generator()
        print("✅ RAG Pipeline 初始化完成\n")

    # ── 知识库构建 ─────────────────────────────────────

    def build_knowledge_base(self, source_path: str):
        """
        构建知识库
        source_path: 文件路径 或 目录路径
        """
        import os
        print(f"📂 开始构建知识库:{source_path}")

        # 1. 加载并分块
        if os.path.isdir(source_path):
            chunks = self.loader.process_directory(source_path)
        else:
            chunks = self.loader.process_file(source_path)

        print(f"📄 共生成 {len(chunks)} 个文本块")

        # 2. 批量向量化
        print("🧠 开始向量化...")
        texts = [chunk.content for chunk in chunks]
        vectors = self.embedder.embed_batch(texts)

        # 3. 存入 Milvus
        print("🗄️ 存入 Milvus...")
        self.vector_store.insert_chunks(chunks, vectors)

        print(f"\n🎉 知识库构建完成!共收录 {len(chunks)} 个知识片段")

    # ── 问答查询 ───────────────────────────────────────

    def query(self, question: str, stream: bool = False) -> str:
        """
        RAG 问答主流程
        1. 检索相关文档
        2. 构建上下文
        3. LLM 生成回答
        """
        print(f"\n❓ 用户问题:{question}")
        print("-" * 50)

        # Step 1: 检索
        retrieved_docs = self.retriever.retrieve(question)

        if not retrieved_docs:
            return "抱歉,知识库中未找到与您问题相关的内容,请尝试换个方式提问。"

        print(f"🔍 召回 {len(retrieved_docs)} 个相关片段:")
        for i, doc in enumerate(retrieved_docs, 1):
            print(f"  [{i}] 相似度:{doc['score']:.4f} | {doc['source']}")

        # Step 2: 格式化上下文
        context = self.retriever.format_context(retrieved_docs)

        # Step 3: 生成回答
        print("\n🤖 生成回答...")
        if stream:
            # 流式输出
            full_answer = ""
            for token in self.generator.generate_with_stream(question, context):
                print(token, end="", flush=True)
                full_answer += token
            print()
            return full_answer
        else:
            answer = self.generator.generate(question, context)
            print(f"\n💬 回答:{answer}")
            return answer

    def get_stats(self):
        """查看知识库状态"""
        stats = self.vector_store.get_stats()
        print(f"📊 知识库统计:{stats}")

🚀 Step 8:运行入口

# main.py

from rag_pipeline import RAGPipeline

def main():
    # 初始化 RAG 系统
    rag = RAGPipeline()

    # ── 构建知识库 ──────────────────────────────
    # 支持单个文件或整个目录
    rag.build_knowledge_base("./documents")

    # ── 查看统计 ────────────────────────────────
    rag.get_stats()

    # ── 交互式问答 ──────────────────────────────
    print("\n" + "="*60)
    print("💬 知识库问答系统已就绪,输入 'quit' 退出")
    print("="*60)

    while True:
        question = input("\n请输入您的问题:").strip()

        if question.lower() in ["quit", "exit", "退出"]:
            print("再见!👋")
            break

        if not question:
            continue

        rag.query(question, stream=True)


if __name__ == "__main__":
    main()

📊 运行效果示例

🚀 初始化 RAG Pipeline...
加载 Embedding 模型:BAAI/bge-base-zh-v1.5
模型加载完成,向量维度:768
✅ RAG Pipeline 初始化完成

📂 开始构建知识库:./documents
正在处理:产品手册.pdf
  → 生成 128 个文本块
正在处理:技术文档.md
  → 生成 56 个文本块
📄 共生成 184 个文本块
🧠 开始向量化...
  已插入 100/184 条
  已插入 184/184 条
🎉 知识库构建完成!共收录 184 个知识片段

============================================================
💬 知识库问答系统已就绪
============================================================

请输入您的问题:产品的退款政策是什么?

❓ 用户问题:产品的退款政策是什么?
--------------------------------------------------
🔍 召回 3 个相关片段:
  [1] 相似度:0.9231 | ./documents/产品手册.pdf
  [2] 相似度:0.8876 | ./documents/产品手册.pdf
  [3] 相似度:0.7654 | ./documents/技术文档.md

🤖 生成回答...
根据产品手册,退款政策如下:
1. 购买后 7 天内可申请无理由退款...
2. 产品存在质量问题可在 30 天内申请退换货...

🔧 进阶优化方向

┌─────────────────────────────────────────────────────┐
│                  优化方向                            │
├──────────────┬──────────────────────────────────────┤
│ 检索优化      │ 混合检索(向量 + BM25关键词)          │
│              │ 重排序(Reranker)模型二次排序          │
│              │ HyDE 假设文档嵌入                      │
├──────────────┼──────────────────────────────────────┤
│ 分块优化      │ 语义分块(按段落/章节)                │
│              │ 父子块策略(小块检索,大块生成)        │
├──────────────┼──────────────────────────────────────┤
│ 生成优化      │ 引用溯源(标注答案来源)               │
│              │ 多轮对话历史管理                       │
├──────────────┼──────────────────────────────────────┤
│ 工程优化      │ 异步并发处理                           │
│              │ 向量缓存(Redis)                      │
│              │ 增量更新知识库                         │
└──────────────┴──────────────────────────────────────┘

需要深入某个模块吗?比如 混合检索实现Reranker 接入LangChain 版本重构,或是 多轮对话管理