混合检索实现:向量检索 + BM25 关键词检索
🎯 为什么需要混合检索?
┌─────────────────────────────────────────────────────────┐
│ 单一检索方式的局限性 │
├─────────────────────┬───────────────────────────────────┤
│ 纯向量检索 │ 纯 BM25 关键词检索 │
├─────────────────────┼───────────────────────────────────┤
│ ✅ 语义理解强 │ ✅ 精确关键词匹配 │
│ ✅ 同义词/近义词识别 │ ✅ 专有名词、数字精确匹配 │
│ ❌ 精确词匹配弱 │ ❌ 语义理解差 │
│ ❌ 专有名词易漏检 │ ❌ 同义词无法识别 │
└─────────────────────┴───────────────────────────────────┘
↓
混合检索 = 两者优势互补
召回率 ↑ 精准度 ↑
典型场景对比
🏗️ 混合检索架构
用户查询
│
├──────────────────┬──────────────────┐
↓ ↓ ↓
向量化 BM25 分词 (可选)
│ │ 精确过滤
↓ ↓
Milvus BM25 索引
向量检索 关键词检索
│ │
└────────┬─────────┘
↓
RRF 融合排序
(Reciprocal Rank Fusion)
↓
Top K 结果
↓
Reranker 精排(可选)
↓
最终召回结果
🛠️ 环境安装
pip install pymilvus[model] # Milvus + 内置模型支持
pip install rank_bm25 # BM25 实现
pip install jieba # 中文分词
pip install FlagEmbedding # BGE Reranker(可选)
📁 模块结构
hybrid_search/
├── config.py # 配置
├── bm25_retriever.py # BM25 检索器
├── vector_retriever.py # 向量检索器
├── rrf_fusion.py # RRF 融合算法
├── hybrid_retriever.py # 混合检索主模块
└── demo.py # 演示
⚙️ Step 1:配置文件
# config.py
class Config:
# Milvus
MILVUS_URI = "http://localhost:19530"
COLLECTION_NAME = "hybrid_kb"
VECTOR_DIM = 768
# 检索参数
VECTOR_TOP_K = 20 # 向量检索召回数(粗召回多一些)
BM25_TOP_K = 20 # BM25 召回数
FINAL_TOP_K = 5 # 最终返回数
# RRF 参数
RRF_K = 60 # RRF 平滑参数,通常取 60
# 权重配置(两路结果的权重)
VECTOR_WEIGHT = 0.7 # 语义检索权重
BM25_WEIGHT = 0.3 # 关键词检索权重
# Embedding 模型
EMBED_MODEL = "BAAI/bge-base-zh-v1.5"
# 相似度阈值
MIN_SCORE = 0.0 # RRF 无需阈值过滤
🔤 Step 2:BM25 检索器
# bm25_retriever.py
import jieba
import jieba.analyse
from rank_bm25 import BM25Okapi
from typing import List, Dict, Tuple
import pickle
import os
import re
class BM25Retriever:
"""
基于 BM25 算法的关键词检索器
BM25 公式:
score(q,d) = Σ IDF(qi) · (f(qi,d)·(k1+1)) / (f(qi,d) + k1·(1-b+b·|d|/avgdl))
"""
def __init__(self, index_path: str = "bm25_index.pkl"):
self.index_path = index_path
self.bm25 = None
self.corpus_tokens = [] # 分词后的语料
self.corpus_docs = [] # 原始文档列表
self._load_stopwords()
def _load_stopwords(self):
"""加载停用词"""
# 常用中文停用词
self.stopwords = {
'的', '了', '在', '是', '我', '有', '和', '就',
'不', '人', '都', '一', '一个', '上', '也', '很',
'到', '说', '要', '去', '你', '会', '着', '没有',
'看', '好', '自己', '这', '那', '他', '她', '它',
'与', '及', '或', '但', '而', '且', '如', '若',
}
def _tokenize(self, text: str) -> List[str]:
"""
中文分词 + 停用词过滤
使用 jieba 精确模式
"""
# 清洗文本
text = re.sub(r'[^\w\s\u4e00-\u9fff]', ' ', text)
# jieba 分词
tokens = jieba.cut(text, cut_all=False)
# 过滤停用词和单字(保留数字、英文)
filtered = [
t.strip() for t in tokens
if t.strip()
and t not in self.stopwords
and (len(t) > 1 or t.isdigit() or t.isalpha())
]
return filtered
def build_index(self, documents: List[Dict]):
"""
构建 BM25 索引
documents: [{"id": 0, "content": "...", "source": "..."}]
"""
print("🔨 构建 BM25 索引...")
self.corpus_docs = documents
self.corpus_tokens = [
self._tokenize(doc["content"])
for doc in documents
]
# 构建 BM25 模型
self.bm25 = BM25Okapi(
self.corpus_tokens,
k1=1.5, # 词频饱和参数,通常 1.2~2.0
b=0.75, # 文档长度归一化参数
)
# 持久化索引
self._save_index()
print(f"✅ BM25 索引构建完成,共 {len(documents)} 条文档")
def search(self, query: str, top_k: int = 20) -> List[Dict]:
"""
BM25 检索
返回 [{"doc": {...}, "score": float, "rank": int}]
"""
if not self.bm25:
raise RuntimeError("BM25 索引未构建,请先调用 build_index()")
# 查询分词
query_tokens = self._tokenize(query)
if not query_tokens:
return []
# 计算 BM25 分数
scores = self.bm25.get_scores(query_tokens)
# 排序取 Top K
top_indices = sorted(
range(len(scores)),
key=lambda i: scores[i],
reverse=True
)[:top_k]
results = []
for rank, idx in enumerate(top_indices):
if scores[idx] > 0: # 过滤零分结果
results.append({
"doc": self.corpus_docs[idx],
"score": float(scores[idx]),
"rank": rank + 1,
})
return results
def _save_index(self):
"""持久化 BM25 索引"""
with open(self.index_path, "wb") as f:
pickle.dump({
"bm25": self.bm25,
"corpus_tokens": self.corpus_tokens,
"corpus_docs": self.corpus_docs,
}, f)
def load_index(self) -> bool:
"""加载已有索引"""
if not os.path.exists(self.index_path):
return False
with open(self.index_path, "rb") as f:
data = pickle.load(f)
self.bm25 = data["bm25"]
self.corpus_tokens = data["corpus_tokens"]
self.corpus_docs = data["corpus_docs"]
print(f"✅ BM25 索引加载成功,共 {len(self.corpus_docs)} 条文档")
return True
🔍 Step 3:向量检索器
# vector_retriever.py
from pymilvus import MilvusClient, DataType
from sentence_transformers import SentenceTransformer
from typing import List, Dict
from config import Config
class VectorRetriever:
"""Milvus 向量检索器"""
def __init__(self):
self.client = MilvusClient(uri=Config.MILVUS_URI)
self.collection = Config.COLLECTION_NAME
self.embedder = SentenceTransformer(Config.EMBED_MODEL)
self._init_collection()
def _init_collection(self):
"""初始化向量集合"""
if self.client.has_collection(self.collection):
return
schema = self.client.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=Config.VECTOR_DIM)
schema.add_field("content", DataType.VARCHAR, max_length=2000)
schema.add_field("source", DataType.VARCHAR, max_length=500)
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 256}
)
self.client.create_collection(
collection_name=self.collection,
schema=schema,
index_params=index_params
)
print(f"✅ Milvus 集合 [{self.collection}] 创建成功")
def embed(self, text: str) -> List[float]:
"""文本向量化"""
prefixed = f"为这个句子生成表示以用于检索相关文章:{text}"
vector = self.embedder.encode(prefixed, normalize_embeddings=True)
return vector.tolist()
def insert(self, documents: List[Dict]):
"""批量插入文档"""
print("🧠 向量化并插入 Milvus...")
data = []
texts = [doc["content"] for doc in documents]
# 批量 Embedding
prefixed = [f"为这个句子生成表示以用于检索相关文章:{t}" for t in texts]
vectors = self.embedder.encode(
prefixed,
batch_size=32,
normalize_embeddings=True,
show_progress_bar=True
)
for i, (doc, vec) in enumerate(zip(documents, vectors)):
data.append({
"id": doc["id"],
"vector": vec.tolist(),
"content": doc["content"][:2000],
"source": doc["source"],
})
# 分批插入
for i in range(0, len(data), 100):
self.client.insert(self.collection, data[i:i+100])
print(f"✅ 已插入 {len(data)} 条向量")
def search(self, query: str, top_k: int = 20) -> List[Dict]:
"""
向量相似度检索
返回 [{"doc": {...}, "score": float, "rank": int}]
"""
query_vector = self.embed(query)
results = self.client.search(
collection_name=self.collection,
data=[query_vector],
limit=top_k,
output_fields=["content", "source"],
search_params={"ef": 200}
)
hits = []
for rank, hit in enumerate(results[0]):
hits.append({
"doc": {
"id": hit["id"],
"content": hit["entity"]["content"],
"source": hit["entity"]["source"],
},
"score": float(hit["distance"]),
"rank": rank + 1,
})
return hits
⚖️ Step 4:RRF 融合算法
# rrf_fusion.py
from typing import List, Dict
from config import Config
class RRFFusion:
"""
Reciprocal Rank Fusion(倒数排名融合)
核心公式:
RRF_score(d) = Σ weight_i / (k + rank_i(d))
其中:
k = 平滑参数(默认 60),防止高排名文档得分过高
rank = 文档在某路检索结果中的排名(从 1 开始)
weight = 该路检索的权重
优势:
- 不依赖原始分数的量纲,天然解决分数尺度不一致问题
- 对排名靠前的文档给予更高权重
- 简单高效,效果出色
"""
def __init__(self, k: int = None):
self.k = k or Config.RRF_K
def fuse(
self,
vector_results: List[Dict],
bm25_results: List[Dict],
vector_weight: float = None,
bm25_weight: float = None,
) -> List[Dict]:
"""
融合两路检索结果
Args:
vector_results: 向量检索结果列表
bm25_results: BM25 检索结果列表
vector_weight: 向量检索权重
bm25_weight: BM25 检索权重
Returns:
融合后按 RRF 分数排序的文档列表
"""
vw = vector_weight or Config.VECTOR_WEIGHT
bw = bm25_weight or Config.BM25_WEIGHT
# 文档得分字典 {doc_id: {"rrf_score": float, "doc": dict}}
score_map: Dict[int, Dict] = {}
# ── 向量检索结果累加 RRF 分数 ──────────────────
for item in vector_results:
doc_id = item["doc"]["id"]
rank = item["rank"]
rrf = vw / (self.k + rank)
if doc_id not in score_map:
score_map[doc_id] = {
"rrf_score": 0.0,
"vector_score": 0.0,
"bm25_score": 0.0,
"vector_rank": None,
"bm25_rank": None,
"doc": item["doc"],
}
score_map[doc_id]["rrf_score"] += rrf
score_map[doc_id]["vector_score"] = item["score"]
score_map[doc_id]["vector_rank"] = rank
# ── BM25 检索结果累加 RRF 分数 ─────────────────
for item in bm25_results:
doc_id = item["doc"]["id"]
rank = item["rank"]
rrf = bw / (self.k + rank)
if doc_id not in score_map:
score_map[doc_id] = {
"rrf_score": 0.0,
"vector_score": 0.0,
"bm25_score": 0.0,
"vector_rank": None,
"bm25_rank": None,
"doc": item["doc"],
}
score_map[doc_id]["rrf_score"] += rrf
score_map[doc_id]["bm25_score"] = item["score"]
score_map[doc_id]["bm25_rank"] = rank
# ── 按 RRF 分数排序 ────────────────────────────
fused = sorted(
score_map.values(),
key=lambda x: x["rrf_score"],
reverse=True
)
return fused
def explain(self, fused_results: List[Dict]) -> str:
"""输出融合结果的可解释报告"""
lines = ["=" * 60, "📊 RRF 融合结果分析", "=" * 60]
for i, r in enumerate(fused_results, 1):
v_rank = r["vector_rank"] or "未命中"
b_rank = r["bm25_rank"] or "未命中"
lines.append(
f"[{i}] RRF={r['rrf_score']:.4f} | "
f"向量排名={v_rank}(score={r['vector_score']:.3f}) | "
f"BM25排名={b_rank}(score={r['bm25_score']:.3f})\n"
f" 内容:{r['doc']['content'][:60]}..."
)
return "\n".join(lines)
🔀 Step 5:混合检索主模块
# hybrid_retriever.py
from typing import List, Dict, Optional
from bm25_retriever import BM25Retriever
from vector_retriever import VectorRetriever
from rrf_fusion import RRFFusion
from config import Config
class HybridRetriever:
"""
混合检索器
整合向量检索 + BM25 关键词检索 + RRF 融合
"""
def __init__(self):
print("🚀 初始化混合检索器...")
self.vector_retriever = VectorRetriever()
self.bm25_retriever = BM25Retriever()
self.rrf_fusion = RRFFusion()
# 尝试加载已有 BM25 索引
self.bm25_retriever.load_index()
print("✅ 混合检索器就绪\n")
# ── 知识库构建 ─────────────────────────────────────
def build_index(self, documents: List[Dict]):
"""
构建混合索引
documents: [{"id": int, "content": str, "source": str}]
"""
print(f"📚 开始构建混合索引,共 {len(documents)} 条文档")
# 并行构建两路索引
self.vector_retriever.insert(documents) # 向量索引
self.bm25_retriever.build_index(documents) # BM25 索引
print("🎉 混合索引构建完成!")
# ── 混合检索 ───────────────────────────────────────
def search(
self,
query: str,
top_k: int = None,
vector_weight: float = None,
bm25_weight: float = None,
explain: bool = False,
) -> List[Dict]:
"""
执行混合检索
Args:
query: 用户查询
top_k: 返回结果数
vector_weight: 向量检索权重(0~1)
bm25_weight: BM25 权重(0~1)
explain: 是否输出融合分析报告
Returns:
融合排序后的 Top K 文档列表
"""
top_k = top_k or Config.FINAL_TOP_K
vector_weight = vector_weight or Config.VECTOR_WEIGHT
bm25_weight = bm25_weight or Config.BM25_WEIGHT
print(f"\n🔍 混合检索:{query}")
print(f" 权重配置:向量={vector_weight}, BM25={bm25_weight}")
# ── 两路并行检索 ───────────────────────────────
vector_results = self.vector_retriever.search(
query, top_k=Config.VECTOR_TOP_K
)
bm25_results = self.bm25_retriever.search(
query, top_k=Config.BM25_TOP_K
)
print(f" 向量召回:{len(vector_results)} 条")
print(f" BM25 召回:{len(bm25_results)} 条")
# ── RRF 融合 ───────────────────────────────────
fused = self.rrf_fusion.fuse(
vector_results,
bm25_results,
vector_weight=vector_weight,
bm25_weight=bm25_weight,
)
# ── 输出分析报告 ───────────────────────────────
if explain:
print(self.rrf_fusion.explain(fused[:top_k]))
# 返回 Top K
final = fused[:top_k]
print(f" 融合后返回:{len(final)} 条")
return final
def search_with_mode(
self,
query: str,
mode: str = "hybrid", # "hybrid" | "vector" | "bm25"
top_k: int = None,
) -> List[Dict]:
"""
支持切换检索模式(便于 A/B 测试)
"""
top_k = top_k or Config.FINAL_TOP_K
if mode == "vector":
# 纯向量检索
results = self.vector_retriever.search(query, top_k=top_k)
return [{"doc": r["doc"], "rrf_score": r["score"]} for r in results]
elif mode == "bm25":
# 纯 BM25 检索
results = self.bm25_retriever.search(query, top_k=top_k)
return [{"doc": r["doc"], "rrf_score": r["score"]} for r in results]
else:
# 默认混合检索
return self.search(query, top_k=top_k)
def format_results(self, results: List[Dict]) -> str:
"""格式化检索结果为 LLM 上下文"""
if not results:
return "(未找到相关内容)"
parts = []
for i, r in enumerate(results, 1):
doc = r["doc"]
parts.append(
f"【参考片段 {i}】\n"
f"来源:{doc['source']}\n"
f"内容:{doc['content']}"
)
return "\n\n".join(parts)
🚀 Step 6:完整演示
# demo.py
from hybrid_retriever import HybridRetriever
def prepare_demo_data():
"""准备演示数据"""
return [
{
"id": 0,
"content": "GPT-4o 是 OpenAI 于 2024 年 5 月发布的多模态大语言模型,支持文本、图像、音频输入。",
"source": "AI模型介绍.txt"
},
{
"id": 1,
"content": "Milvus 是开源向量数据库,支持十亿级向量存储和毫秒级相似度搜索,广泛用于 AI 应用。",
"source": "数据库介绍.txt"
},
{
"id": 2,
"content": "退款政策:购买后 7 天内可申请无理由退款,产品质量问题 30 天内可退换货。",
"source": "服务协议.txt"
},
{
"id": 3,
"content": "RAG(检索增强生成)通过检索外部知识库来增强大语言模型的回答准确性,减少幻觉问题。",
"source": "技术文档.txt"
},
{
"id": 4,
"content": "向量数据库与传统数据库的区别:传统数据库存储结构化数据,向量数据库专注高维向量的相似度检索。",
"source": "技术对比.txt"
},
{
"id": 5,
"content": "BM25 是基于词频和逆文档频率的经典关键词检索算法,在信息检索领域被广泛应用。",
"source": "算法介绍.txt"
},
]
def run_comparison_test(retriever: HybridRetriever):
"""对比三种检索模式的效果"""
test_queries = [
"GPT-4o 什么时候发布的", # 精确名词 → BM25 更擅长
"如何退货退款", # 语义查询 → 向量更擅长
"向量数据库有什么优势", # 综合查询 → 混合最佳
]
for query in test_queries:
print("\n" + "=" * 70)
print(f"🔎 查询:{query}")
print("=" * 70)
for mode in ["vector", "bm25", "hybrid"]:
results = retriever.search_with_mode(query, mode=mode, top_k=2)
print(f"\n [{mode.upper():8}] Top1: {results[0]['doc']['content'][:50]}...")
def main():
# 初始化混合检索器
retriever = HybridRetriever()
# 构建索引
docs = prepare_demo_data()
retriever.build_index(docs)
# ── 基础混合检索 ────────────────────────────────────
print("\n" + "="*70)
print("🔍 基础混合检索示例")
print("="*70)
results = retriever.search(
query="向量数据库相似度搜索",
top_k=3,
explain=True # 输出融合分析
)
context = retriever.format_results(results)
print(f"\n📋 格式化上下文:\n{context}")
# ── 权重调节示例 ────────────────────────────────────
print("\n" + "="*70)
print("⚖️ 不同权重配置对比")
print("="*70)
query = "GPT-4o 发布时间"
configs = [
(1.0, 0.0, "纯向量"),
(0.0, 1.0, "纯BM25"),
(0.5, 0.5, "均等混合"),
(0.3, 0.7, "偏重BM25"), # 专有名词查询推荐
]
for vw, bw, label in configs:
results = retriever.search(
query,
top_k=1,
vector_weight=vw,
bm25_weight=bw
)
top1 = results[0]["doc"]["content"][:60] if results else "无结果"
print(f" [{label}] RRF={results[0]['rrf_score']:.4f} → {top1}...")
# ── A/B 对比测试 ────────────────────────────────────
run_comparison_test(retriever)
if __name__ == "__main__":
main()
📊 运行输出示例
🚀 初始化混合检索器...
✅ 混合检索器就绪
📚 开始构建混合索引,共 6 条文档
🧠 向量化并插入 Milvus...
✅ 已插入 6 条向量
🔨 构建 BM25 索引...
✅ BM25 索引构建完成,共 6 条文档
🎉 混合索引构建完成!
======================================================================
🔍 基础混合检索示例
======================================================================
🔍 混合检索:向量数据库相似度搜索
权重配置:向量=0.7, BM25=0.3
向量召回:3 条
BM25 召回:3 条
融合后返回:3 条
============================================================
📊 RRF 融合结果分析
============================================================
[1] RRF=0.0215 | 向量排名=1(score=0.912) | BM25排名=1(score=8.234)
内容:Milvus 是开源向量数据库,支持十亿级向量存储和毫秒级相似...
[2] RRF=0.0148 | 向量排名=2(score=0.876) | BM25排名=未命中(score=0.000)
内容:向量数据库与传统数据库的区别:传统数据库存储结构化数据...
[3] RRF=0.0089 | 向量排名=未命中 | BM25排名=2(score=5.123)
内容:BM25 是基于词频和逆文档频率的经典关键词检索算法...
🎛️ 权重动态调节策略
def auto_adjust_weights(query: str) -> tuple:
"""
根据查询特征自动调节权重
"""
import re
# 检测专有名词(大写字母、数字、型号等)
has_proper_noun = bool(re.search(r'[A-Z0-9]{2,}|GPT|API|v\d+\.\d+', query))
# 检测中文字符比例
chinese_ratio = len(re.findall(r'[\u4e00-\u9fff]', query)) / max(len(query), 1)
# 检测查询长度(长查询语义更丰富)
is_long_query = len(query) > 15
if has_proper_noun:
# 含专有名词 → 偏重 BM25
return 0.3, 0.7
elif is_long_query and chinese_ratio > 0.7:
# 长中文查询 → 偏重向量
return 0.8, 0.2
else:
# 默认均衡
return 0.6, 0.4
📈 效果评估指标
def evaluate_retrieval(retriever, test_cases):
"""
评估检索效果
test_cases: [{"query": str, "relevant_ids": [int]}]
"""
metrics = {"hit@1": 0, "hit@3": 0, "mrr": 0.0}
for case in test_cases:
results = retriever.search(case["query"], top_k=5)
result_ids = [r["doc"]["id"] for r in results]
relevant = set(case["relevant_ids"])
# Hit@1
if result_ids and result_ids[0] in relevant:
metrics["hit@1"] += 1
# Hit@3
if any(rid in relevant for rid in result_ids[:3]):
metrics["hit@3"] += 1
# MRR(平均倒数排名)
for rank, rid in enumerate(result_ids, 1):
if rid in relevant:
metrics["mrr"] += 1.0 / rank
break
n = len(test_cases)
print(f"Hit@1: {metrics['hit@1']/n:.2%}")
print(f"Hit@3: {metrics['hit@3']/n:.2%}")
print(f"MRR: {metrics['mrr']/n:.4f}")
🔮 进阶:接入 Reranker 精排
from FlagEmbedding import FlagReranker
class RerankerPostProcessor:
"""
使用 BGE Reranker 对混合检索结果二次精排
精排模型比 Embedding 模型更精准,但速度较慢
→ 先粗召回 20 条,再精排取 Top 5
"""
def __init__(self, model_name="BAAI/bge-reranker-base"):
self.reranker = FlagReranker(model_name, use_fp16=True)
def rerank(self, query: str, docs: List[Dict], top_k: int = 5) -> List[Dict]:
"""对混合检索结果重新排序"""
pairs = [(query, d["doc"]["content"]) for d in docs]
scores = self.reranker.compute_score(pairs, normalize=True)
for doc, score in zip(docs, scores):
doc["rerank_score"] = score
reranked = sorted(docs, key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_k]
📌 最佳实践总结
┌─────────────────────────────────────────────────────────┐
│ 混合检索最佳实践 │
├─────────────────┬───────────────────────────────────────┤
│ 粗召回阶段 │ 向量+BM25 各召回 20 条,共 40 条候选 │
├─────────────────┼───────────────────────────────────────┤
│ 融合排序 │ RRF 算法融合,权重按查询类型动态调节 │
├─────────────────┼───────────────────────────────────────┤
│ 精排阶段(可选) │ Reranker 模型对 Top 20 精排取 Top 5 │
├─────────────────┼───────────────────────────────────────┤
│ 权重建议 │ 通用:向量0.7/BM25 0.3 │
│ │ 专有名词多:向量0.3/BM25 0.7 │
│ │ 长语义查询:向量0.8/BM25 0.2 │
├─────────────────┼───────────────────────────────────────┤
│ 中文优化 │ jieba 分词 + 停用词过滤 │
│ │ BGE 中文 Embedding 模型 │
└─────────────────┴───────────────────────────────────────┘
接下来想深入哪个方向?
🎯 Reranker 精排完整实现
💬 多轮对话历史管理
📈 检索评估体系搭建(RAGAS 框架)
⚡ 异步并发优化