算法打败文盲,我用向量数据库与RAG,做了个“鲁迅没说过”

文摘   2024-10-29 18:30   上海  

鲁迅说,“世上本没有路,走的人多了,也便成了路。”。

鲁迅说,“我家墙外有两株树,一株是枣树,还有一株也是枣树。”

鲁迅还说,“猛兽总是独行,牛羊才成群结对。”

网络上流传着鲁迅说过的各种名言,我们不禁怀疑,鲁迅到底说没说?原文是什么样的,出处又是哪里?想回答这个问题,最好的办法就是搜索原文。但是,使用传统搜索方式,错了一个字可能就搜索不到,不如试试语义搜索吧。

我们可以把鲁迅作品集向量化,储存到向量数据库中。然后搜索某条据说是鲁迅说过的话,最后通过大模型组织语言输出回答,告诉我们鲁迅有没有说过这句话。如果有,再让它附上原文和出处。这个过程,就是 RAG(Retrieval-Augmented Generation,检索增强生成)。

而对于较长的本文,直接向量化会导致信息缺失,需要把文本分割成多个块,分别向量化。打个比方,如果一篇文章是一张图片,组成文章的块就是图片的像素点。文章分割成的块越多,意味着图片的像素点越多,分辨率越高,图片也就越清晰。我会介绍三种常见的分块方法,并且比较基于它们的向量搜索和 RAG 响应有什么区别。

图片来源:Photo by Master Unknown on Unsplash

01.

字数太多怎么向量化

《怎么把大白话“变成”古诗词》这篇文章中,我详细介绍了使用 Milvus 创建向量数据库的整个过程,相关内容我就不再赘述了,直接给出代码。

Milvus 版本:>=2.4.0

安装依赖。

!pip install "pymilvus[model]" torch

定义函数 vectorize_query 把文本向量化的函数。

import torch
import json
from pymilvus import DataType, MilvusClient
from pymilvus.model.hybrid import BGEM3EmbeddingFunction

# 将输入的文本向量化
def vectorize_query(query , model_name = "BAAI/bge-small-zh-v1.5"):
    # 检查是否有可用的CUDA设备
    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    # 根据设备选择是否使用fp16
    use_fp16 = device.startswith("cuda")
    # 创建嵌入模型实例
    bge_m3_ef = BGEM3EmbeddingFunction(
        model_name=model_name,
        device=device,
        use_fp16=use_fp16
    )
    # 把输入的文本向量化
    vectors = bge_m3_ef.encode_documents(query)
    return vectors

下一步就是把鲁迅作品集向量化了。但是且慢,让我们先看一下鲁迅作品集[^1]的文本格式:


        {
                "book""伪自由书"
                "title""最艺术的国家"
                "author""鲁迅"
                "type"""
                "source"""
                "date"""
                "content""我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人... 
        }, 
        {
                "
book": "伪自由书", 
                "
title": "王道诗话", 
                "
author": "鲁迅", 
                "
type": "", 
                "
source": "", 
                "
date": "", 
                "
content": "《人权论》是从鹦鹉开头的,据说古时候有一只高飞远走的鹦哥儿... 
        }, 
        ...
]

文本中的“content”字段的值,就是一篇文章。有的文章字数多达几万字,用几百维的向量根本无法表达文章的语义细节。怎么办?就像前面说的,既然全文字数太多,我们就把文章切成几块,对每个块再做向量化。这个操作叫做“分块”。

02.

根据固定字数分块

最简单的分块方法是 fixed_chunk(固定分块),是按照字数分块,比如每隔150个字就分割一次。比如,对于《最艺术的国家》这篇文章使用 fixed_chunk,再通过 ChunkViz 把分块结果可视化,如下图所示:

我们用代码来实现 fixed_chunk

import json

# fixed_chunk
def fixed_chunk(
    input_file_path,
    output_file_path, 
    chunk_size, 
    field_name
    ):
    with open(input_file_path, 'r', encoding='utf-8') as file:
        data_list = json.load(file)
        chunk_data_list = []
        for data in data_list:
            # 获取指定字段的值
            text = data[field_name]
            # 对指定字段分割
            chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
            for idx, chunk in enumerate(chunks):
                chunk_data_list.append({
                    # 使用原始文章的 id 生成chunk的id
                    "id": f'{data["book"]}#{data["title"]}#chunk{idx}',
                    "book" : data["book"],
                    "title" : data["title"],
                    "author" : data["author"],
                    "type" : data["type"],
                    "source" : data["source"],
                    "date" : data["date"],
                    "chunk" : chunk,
                    # window 字段在这里只是占位,没有实际作用,后面会详细介绍它的用处
                    "window""",
                    "method""fixed_chunk"
                })
        with open(output_file_path, 'w', encoding='utf-8') as json_file:
            json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)

input_file_path = "luxun_sample.json"
output_file_path = "luxun_sample_fixed_chunk.json"
chunk_size = 150
field_name = "content"

fixed_chunk(input_file_path, output_file_path, chunk_size, field_name)

运行代码,得到 luxun_sample_fixed_chunk.json 文件,格式和上文中的可视化结果一致。

[
    {
        "id""伪自由书 #最艺术的国家 #chunk0 ",
        "book""伪自由书",
        "title""最艺术的国家",
        "author""鲁迅",
        "type""",
        "source""",
        "date""",
        "chunk""我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人...",
        "window""",
        "method""fixed_chunk"
    },
    {
        "id""伪自由书 #最艺术的国家 #chunk1 ",
        "book""伪自由书",
        "title""最艺术的国家",
        "author""鲁迅",
        "type""",
        "source""",
        "date""",
        "chunk""民国。然而这民国年久失修...",
        "window""",
        "method""fixed_chunk"
    },
    ...
]

你可能已经发现了,fixed_chunk 经常在句子中间分割,导致句子不连贯,语义的完整性被破坏。

03.

根据标点符号分割

怎么解决这个问题呢?我们可以在标点符号处分割。但是这还不够,因为这样分割的话,块与块之间仍然是相互独立的了,缺少关联。打个比方,如果看《生活大爆炸》这样的单元剧,我们跳着看也没关系,不影响理解剧情。但是如果看《天龙八部》这样的连续剧,上一集讲的还是段誉为救钟灵去万劫谷拿解药,下一集他就瞬移到了少室山,用六脉神剑大战慕容复。我们会一头雾水,这中间到底发生了什么?

所以,连续剧的开头有“前情提要”,结尾有“下集预告”。同样,为了保证块与块之间语义的连贯,我们也要设计一个“重叠”部分,让下一个块的开头部分,重复上一个块的结尾部分。

听起来很复杂?不用担心,我们可以使用 LlamaIndex[^2] 库轻松实现这种分块方法—— semantic_chunk

安装 LlamaIndex 库。

pip install llama_index==0.11.16

定义 semantic_chunk 分块函数。

# 导入SentenceSplitter用来分块
from llama_index.core.node_parser import SentenceSplitter

# 定义semantic_chunk分块函数
def semantic_chunk(
        input_file_path, 
        output_file_path, 
        # 块的大小
        chunk_size,
        # 重叠部分的大小。小于 chunk_overlap 的句子,就会作为重叠部分
        chunk_overlap,
        # 指定分块的字段
        field_name,
        ) :
    # 初始化 SentenceSplitter,设置分块的参数
    text_splitter = SentenceSplitter(
        # 指定段落分隔符
        paragraph_separator="\n\n\n",
        # 指定主要分隔符
        separator="。",
        # 指定次要分隔符
        secondary_chunking_regex="[^,.;、。:]+[,.;、。:]?",
        # 指定块的大小
        chunk_size=chunk_size, 
        # 指定重叠部分的大小
        chunk_overlap=chunk_overlap,
    )
    with open(input_file_path, 'r', encoding='utf-8') as file:
        data_list = json.load(file)
        chunk_data_list = []
        for data in data_list:
            text = data[field_name]
            chunks = text_splitter.split_text(text)
            for idx, chunk in enumerate(chunks):
                chunk_data_list.append({
                    # 使用原始文章的 id 生成chunk的id
                    "id": f'{data["book"]}#{data["title"]}#chunk{idx}',
                    "book" : data["book"],
                    "title" : data["title"],
                    "author" : data["author"],
                    "type" : data["type"],
                    "source" : data["source"],
                    "date" : data["date"],
                    "chunk" : chunk,
                    # window 字段在这里只是占位,没有实际作用,后面会详细介绍它的用处
                    "window""",
                    "method""semantic_chunk"
                })
        with open(output_file_path, 'w', encoding='utf-8') as json_file:
            json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)

# 示例:
input_file_path = "luxun_sample.json"
output_file_path = "luxun_sample_semantic_chunk.json"
chunk_size = 150
chunk_overlap = 20
field_name = "content"

semantic_chunk(
    input_file_path, 
    output_file_path, 
    chunk_size, 
    chunk_overlap,
    field_name
)

执行上面的代码,得到 luxun_sample_semantic_chunk.json 文件,我们来看一下分块的结果:

[
    {
        "id""伪自由书#最艺术的国家#chunk0",
        "book""伪自由书",
        "title""最艺术的国家",
        "author""鲁迅",
        "type""",
        "source""",
        "date""",
        "chunk""我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人...中国的固有文化是科举制度,外加捐班之类。",
        "window""",
        "method""semantic_chunk"
    },
    {
        "id""伪自由书#最艺术的国家#chunk1",
        "book""伪自由书",
        "title""最艺术的国家",
        "author""鲁迅",
        "type""",
        "source""",
        "date""",
        "chunk""外加捐班之类。当初说这太不像民权...这对于民族是不忠,对于祖宗是不孝,",
        "window""",
        "method""semantic_chunk"
    },
    ...
]

果然是在我们设置的标点符号处分块的,而且附带重叠部分,这样就能保证块与块之间语义的连贯了。

04.

根据句子分块

对于上面的分块结果,你可能还不满意。虽然它根据标点符号分割,但是并不一定在句号处分割,无法保证句子的完整性。比如,对于这句话 我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。可能分割成 我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。这艺术的可贵是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮” 两个块。

为了解决这个问题,又诞生了一种分块方法,它根据句子而不是字数分割,也就是说,根据“。”、“!”和“?”这三个表示句子结束的标点符号分割,而不会受到字数的限制。但是,这种分割方式怎么实现重叠的功能呢?这也简单,把整个句子作为重叠部分就行了,叫做“窗口句子”。这种分块方法叫做 sentence_window

比如,对于句子 ABCD,设置窗口大小为1,表示原始句子的左右各1个句子为“窗口句子”。分块如下:

第一个句子:A。窗口句子:B。因为第一个句子的左边没有句子。

第二个句子:B。窗口句子:A 和 C。

第三个句子:C。窗口句子:B 和 D。

第四个句子:D。窗口句子:C。因为最后一个句子的右边没有句子。

前面两种分块方法,都是对 chunk 字段向量化。而这种分块方法,除了对 chunk 字段(也就是原始句子)向量化外,还会把窗口句子作为原始句子的上下文,以元数据的形式储存在文件中。

原始句子用来做向量搜索,而在生成回答时,窗口句子和原始句子会一起传递给大模型。这样做的好处是,只向量化原始句子,节省了储存空间。提供窗口句子作为原始句子的上下文,可以帮助大模型理解原始句子的语境。

理解原理了,我们用代码来实现吧。

导入依赖。

import re
import json
from typing import List
from llama_index.core import Document
from llama_index.core.node_parser import SentenceWindowNodeParser

定义函数 split_text_into_sentences,用来分割中英文句子。

def split_text_into_sentences(text):
    # 使用正则表达式识别中英文句子结束符
    sentence_endings = re.compile(r'(?<=[。!?.!?])')
    sentences = sentence_endings.split(text)
    return [s.strip() for s in sentences if s.strip()]

定义函数 sentence_window,基于句子对文本分块。

# 定义sentence_window分块函数
def sentence_window(
        input_file_path, 
        output_file_path,
        field_name,
        window_size
    ):
    # 设置一个用于文本解析的节点解析器
    node_parser = SentenceWindowNodeParser.from_defaults(
        window_size=window_size,
        # 为窗口元数据指定一个键名为"window",用于在解析过程中存储窗口数据
        window_metadata_key="window",
        # 为原始文本元数据指定一个键名为"original_text",用于在解析过程中存储原始文本
        original_text_metadata_key="original_text",
        sentence_splitter = split_text_into_sentences
    )
    with open(input_file_path, 'r', encoding='utf-8') as file:
        data_list = json.load(file)
        chunk_data_list = []
        for data in data_list:
            text = data[field_name]
            # 将分割后的句子处理成节点。节点包含多个句子,类似于块
            document = Document(text=text)
            nodes = node_parser.get_nodes_from_documents([document])
            for idx, node in enumerate(nodes):
                chunk = node.metadata["original_text"]
                window = node.metadata["window"]
                chunk_data_list.append({
                    "id": f'{data["book"]}#{data["title"]}#chunk{idx}',
                    "book": data["book"],
                    "title": data["title"],
                    "author": data["author"],
                    "type": data["type"],
                    "source": data["source"],
                    "date": data["date"],
                    "chunk": chunk,
                    "window": window,
                    "method""sentence_window"
                })
        with open(output_file_path, 'w', encoding='utf-8') as json_file:
            json.dump(chunk_data_list, json_file, ensure_ascii=False, indent=4)

# 执行句子分块
input_file_path = "luxun_sample.json"
output_file_path = "luxun_sample_sentence_window.json"
field_name = "content"
window_size = 1

sentence_window(
    input_file_path,
    output_file_path,
    field_name,
    window_size
)

让我们来看下分块的结果,字段“chunk”是原始句子,“window”里面包含了原始句子和窗口句子。

[
    {
        "id""伪自由书#最艺术的国家#chunk0",
        "book""伪自由书",
        "title""最艺术的国家",
        "author""鲁迅",
        "type""",
        "source""",
        "date""",
        "chunk""我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。",
        "window""我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。 这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。",
        "method""sentence_window"
    },
    {
        "id""伪自由书#最艺术的国家#chunk1",
        "book""伪自由书",
        "title""最艺术的国家",
        "author""鲁迅",
        "type""",
        "source""",
        "date""",
        "chunk""这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。",
        "window""我们中国的最伟大最永久,而且最普遍的“艺术”是男人扮女人。 这艺术的可贵,是在于两面光,或谓之“中庸”---男人看见“扮女人”,女人看见“男人扮”。 表面上是中性,骨子里当然还是男的。",
        "method""sentence_window"
    },
    ...
]

05.

创建向量数据库

文本分块完成,接下来就是文本向量化,导入向量数据库了,这部分你应该比较熟悉了,我直接给出代码。

定义函数 vectorize_file,向量化 json 文件中指定的字段。

def vectorize_file(input_file_path, output_file_path, field_name):
    # 读取 json 文件,把chunk字段的值向量化
    with open(input_file_path, 'r', encoding='utf-8') as file:
        data_list = json.load(file)
        # 提取该json文件中的所有chunk字段的值
        query = [data[field_name] for data in data_list]
    # 向量化文本数据
    vectors = vectorize_query(query)
    # 将向量添加到原始文本中
    for data, vector in zip(data_list, vectors['dense']):
        # 将 NumPy 数组转换为 Python 的普通列表
        data['vector'] = vector.tolist()
    # 将更新后的文本内容写入新的json文件
    with open(output_file_path, 'w', encoding='utf-8') as outfile:
        json.dump(data_list, outfile, ensure_ascii=False, indent=4)

为了比较 RAG 使用不同分块方法的效果,我们把三个分块文件全部向量化。

# 向量化固定分块的文件
vectorize_file("luxun_sample_fixed_chunk.json""luxun_sample_fixed_chunk_vector.json""chunk"

# 向量化通过标点符号分块的文件
vectorize_file("luxun_sample_semantic_chunk.json""luxun_sample_semantic_chunk_vector.json""chunk"

# 向量化通过句子分块的文件
vectorize_file("luxun_sample_sentence_window.json","luxun_sample_sentence_window_vector.json""chunk"

接下来创建集合。为了能够在同一个集合中区分三种分块方法的搜索结果,我们设置参数 partition_key_field 的值为 method,它表示采用的分块方法。Milvus 会根据 method 字段的值,把数据插入到对应的分区中。打个比方,如果把集合看作一个 excel 文件,partition (分区)就是表格的工作表(Worksheet)。一个 excel 文件包含多张工作表,不同的数据填写在对应的工作表中。相应的,我们把不同的数据插入到对应分区中,搜索时指定分区,就可以提高搜索效率。

# 创建集合
from pymilvus import MilvusClient, DataType
import time

def create_collection(collection_name):
    # 检查同名集合是否存在,如果存在则删除
    if milvus_client.has_collection(collection_name):
        print(f"集合 {collection_name} 已经存在")
        try:
            # 删除同名集合
            milvus_client.drop_collection(collection_name)
            print(f"删除集合:{collection_name}")
        except Exception as e:
            print(f"删除集合时出现错误: {e}")
    # 创建集合模式
    schema = MilvusClient.create_schema(
        auto_id=False,
        enable_dynamic_field=True,
        # 设置partition key
        partition_key_field = "method",
        # 设置分区数量,默认为16
        num_partitions=16,
        description=""
    )
    # 添加字段到schema
    schema.add_field(field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=256)
    schema.add_field(field_name="book", datatype=DataType.VARCHAR, max_length=128)
    schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=128)
    schema.add_field(field_name="author", datatype=DataType.VARCHAR, max_length=64)
    schema.add_field(field_name="type", datatype=DataType.VARCHAR, max_length=64)
    schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=64)
    schema.add_field(field_name="date", datatype=DataType.VARCHAR, max_length=32)
    schema.add_field(field_name="chunk", datatype=DataType.VARCHAR, max_length=2048)
    schema.add_field(field_name="window", datatype=DataType.VARCHAR, default_value="", max_length=6144)
    schema.add_field(field_name="method", datatype=DataType.VARCHAR, max_length=32)
    schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=512)
    # 创建集合
    try:
        milvus_client.create_collection(
            collection_name=collection_name,
            schema=schema,
            shards_num=2
        )
        print(f"创建集合:{collection_name}")
    except Exception as e:
        print(f"创建集合的过程中出现了错误: {e}")
    # 等待集合创建成功
    while not milvus_client.has_collection(collection_name):
        # 获取集合的详细信息
        time.sleep(1)
    if milvus_client.has_collection(collection_name):
        print(f"集合 {collection_name} 创建成功")
    collection_info = milvus_client.describe_collection(collection_name)
    print(f"集合信息: {collection_info}")

collection_name = "LuXunWorks"
uri="http://localhost:19530"
milvus_client = MilvusClient(uri=uri)
create_collection(collection_name)

把数据插入到向量数据库。

from tqdm import tqdm
# 数据入库
def insert_vectors(file_path, collection_name, batch_size):
    # 读取和处理文件
    with open(file_path, 'r') as file:
        data = json.load(file)
    # 将数据插入集合
    print(f"正在将数据插入集合:{collection_name}")
    total_count = len(data)
    # pbar 是 tqdm 库中的一个进度条对象,用于显示插入数据的进度
    with tqdm(total=total_count, desc="插入数据") as pbar:
        # 每次插入 batch_size 条数据
        for i in range(0, total_count, batch_size):  
            batch_data = data[i:i + batch_size]
            res = milvus_client.insert(
                collection_name=collection_name,
                data=batch_data
            )
            pbar.update(len(batch_data))
    # 验证数据是否成功插入集合
    print(f"插入的实体数量: {total_count}")

# 设置每次插入的数据量
batch_size = 100
insert_vectors("luxun_sample_fixed_chunk_vector.json", collection_name, batch_size)
insert_vectors("luxun_sample_semantic_chunk_vector.json", collection_name, batch_size)
insert_vectors("luxun_sample_sentence_window_vector.json", collection_name, batch_size)

创建索引。我们使用倒排索引,首先创建索引参数。

index_params = milvus_client.prepare_index_params()

index_params.add_index(
    # 指定索引名称
    index_name="IVF_FLAT",
    # 指定创建索引的字段
    field_name="vector",
    # 设置索引类型
    index_type="IVF_FLAT",
    # 设置度量方式
    metric_type="IP",
    # 设置索引聚类中心的数量
    params={"nlist": 128}
)

接下来创建索引。

milvus_client.create_index(
    # 指定为创建索引的集合
    collection_name=collection_name,
    # 使用前面创建的索引参数创建索引
    index_params=index_params
)

验证下索引是否成功创建。查看集合的所有索引。

res = milvus_client.list_indexes(
    collection_name=collection_name
)
print(res)

返回我们创建的索引 ['IVF_FLAT']。再查看下索引的详细信息。

res = milvus_client.describe_index(
    collection_name=collection_name,
    index_name="IVF_FLAT"
)
print(res)

返回下面的索引信息,表示索引创建成功:

{'nlist''128''index_type''IVF_FLAT''metric_type''IP''field_name''vector''index_name''IVF_FLAT''total_rows': 0, 'indexed_rows': 0, 'pending_index_rows': 0, 'state''Finished'}

接下来加载集合到内存。

print (f"正在加载集合:{collection_name}")
milvus_client.load_collection (collection_name=collection_name)

验证下加载状态。

print (milvus_client.get_load_state (collection_name=collection_name))

如果返回 {'state': <LoadState: Loaded>},说明加载完成。接下来,我们定义搜索函数。

先定义搜索参数。

search_params = {
    # 度量类型
    "metric_type""IP",
    # 搜索过程中要查询的聚类单元数量。增加nprobe值可以提高搜索精度,但会降低搜索速度
    "params": {"nprobe": 16}
}

再定义搜索函数。还记得前面我们在创建集合时,设置的 partition_key_field 吗?它会根据 method 字段的值,把数据插入到相应的分区中。而搜索函数中的 filter 参数,就是用来指定在哪个分区中搜索的。

def vector_search(
    query, 
    search_params,
    limit,
    output_fields,
    partition_name
):
# 将查询转换为向量
query_vectors = [vectorize_query(query)['dense'][0].tolist()]
# 向量搜索
res = milvus_client.search(
    collection_name=collection_name,
    # 指定查询向量
    data=query_vectors,
    # 指定搜索的字段
    anns_field="vector",
    # 设置搜索参数
    search_params=search_params,
    # 设置搜索结果的数量
    limit=limit,
    # 设置输出字段
    output_fields=output_fields,
    # 在指定分区中搜索
    filter=f"method =='{partition_name}'"
)
return res

再定义一个打印搜索结果的函数,方便查看。

# 打印向量搜索结果
def print_vector_results(res):
    # hit是搜索结果中的每一个匹配的实体
    res = [hit["entity"for hit in res[0]]
    for item in res:
        print(f"title: {item['title']}")
        print(f"chunk: {item['chunk']}")
        print(f"method: {item['method']}")
        print("-"*50)   

下面我们就来看一看,fixed_chunksemantic_chunksentence_window 三位选手在向量搜索上表现如何。首先搜索第一个句子:“世上本没有路,走的人多了,也便成了路。”。

# 比较不同分块方法产生的搜索结果
query1 = ["世上本没有路,走的人多了,也便成了路。"]
limit = 1
output_fields = ["title""chunk""window""method"]

# 定义分块方法列表
chunk_methods = ["fixed_chunk""semantic_chunk""sentence_window"]

# 定义一个函数来执行搜索并打印结果
def compare_chunk_methods(query, search_params, limit, output_fields, methods):
    for method in methods:
        res = vector_search(query, search_params, limit, output_fields, method)
        print(f"{method} 的搜索结果是:\n")
        print_vector_results(res)
        print("*" * 50)

# 调用函数进行比较
compare_chunk_methods(query1, search_params, limit, output_fields, chunk_methods)

搜索结果如下:

fixed_chunk 的搜索结果是:

title: 故乡
chunk: 的人多了,也便成了路。 一九二一年一月。 
method: fixed_chunk
--------------------------------------------------
semantic_chunk 的搜索结果是:

title: 六十六生命的路
chunk: 跨过了灭亡的人们向前进。什么是路?就是从没路的地方践踏出来的,从只有荆棘的地方开辟出来的。以前早有路了,以后也该永远有路。人类总不会寂寞,因为生命是进步的,是乐天的。昨天,我对我的朋友L说,“一个人死了,在死者自身和他的眷属是悲惨的事,
method: semantic_chunk
--------------------------------------------------
sentence_window 的搜索结果是:

title: 故乡
chunk: 这正如地上的路;其实地上本没有路,走的人多了,也便成了路。
method: sentence_window
--------------------------------------------------

fixed_chunk 选手的确搜索到了原文,但是并不完整。这也是 fixed_chunk 分块的典型问题。

semantic_chunk 选手的表现让人失望,它并没有搜索到原文。但是它给了我们意外收获,搜索结果的意思和原文有些类似。也是向量数据库语义搜索功能的体现。

原文其实在这个块中:

我的愿望茫远罢了。我在朦胧中,眼前展开一片海边碧绿的沙地来,上面深蓝的天空中挂着一轮金黄的圆月。我想:希望本是无所谓有,无所谓无的。这正如地上的路;其实地上本没有路,走的人多了,也便成了路。一九二一年一月。"

semantic_chunk 选手没有搜索到它,可能是因为这个块的前半部分和查询句子的语义相差较远。这也反应了分块对搜索结果的影响。

最后出场的 sentence_window 选手,给出了标准答案:

这正如地上的路;其实地上本没有路,走的人多了,也便成了路。

恭喜 sentence_window 选手完美找到了原文。因为它基于句子分割,能够更好地保存句子的语义。当然,这样做也是有代价的。你可以比较下这三种分块方法向量化后的文件,luxun_sample_fixed_chunk_vector.json 的大小是11.5MPa,luxun_sample_semantic_chunk_vector.json 增加到了16.1MPa,而 luxun_sample_sentence_window_vector.json 则达到了49.2MPa,是前两者的3到4倍。

我们再来看看第二个句子,三位选手的表现如何:

“我家墙外有两株树,一株是枣树,还有一株也是枣树。”

fixed_chunk 选手给出的句子仍然不完整,但是包含了完整的原文:

在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,似乎自以为大有深意,而将繁霜洒在我的园里的野花草上。我不知

semantic_chunk 选手这次正常发挥,也找到了原文:

在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,

sentence_window 选手依旧给出了完美答案:

在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。

虽然三位选手都找到了原文,但是 sentence_window 选手返回的原文不但完整,而且没有包含无关内容,减少了干扰信息。

再来看看最后一个句子:

“猛兽总是独行,牛羊才成群结对。”

fixed_chunk 选手找到了类似的句子,但是包含了较多的无关内容:

兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。人民与牛马同流,——此就中国而言,夷人别有分类法云,——治之之道,自然应该禁止集合:这方法是对的。其次要防说话。人能说话,已经是祸胎了,而况有时还要做文章。所以苍颉造字,夜有鬼哭。鬼且反对,而况于官?猴子不会说话

semantic_chunk 找到的则是另外一句完全不相关的句子:

和一些狐群狗党趁势来开除她私意所不喜的学生们么?而几个在“男尊女卑”的社会生长的男人们,此时却在异性的饭碗化身的面前摇尾,简直并羊而不如,羊,诚然的弱的,但还不至于如此,我敢给我所敬爱的羊们保证!但是,在黄金世界还未到来之前,

我们最后看看 sentence_window 选手的表现:

猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。

别忘了 sentence_window 选手除了搜索到的原始句子,还能提供“窗口句子”作为上下文:

# 查看`sentence_window`方法的窗口句子
method = "sentence_window"
res_sentence_window = vector_search(query3, search_params, limit, output_fields, method)
res_sentence_window = [hit["entity"for hit in res_sentence_window[0]]
for item in res_sentence_window:
    print(f"window: {item['window']}")

窗口句子如下:

window: 然亦可见至道嘉猷,人同此心,心同此理,固无华夷之限也。 猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。 人民与牛马同流,——此就中国而言,夷人别有分类法云,——治之之道,自然应该禁止集合:这方法是对的。

在 RAG 应用中,把上下文句子一起传递给大模型,能让大模型更好地理解句子的语义,作出更好的回答。

06.

调用大模型的 API

创建向量数据库这部分想必你已经轻车熟路了,下面我们来完成 RAG 应用的最后一个部分:生成。我们要把搜索到的句子传递给大模型,让它根据提示词重新组装成回答。

首先,我们要创建一个大模型的 api key,用来调用大模型。我使用的是 deepseek。为了保护 api key 的安全,把 api key 设置为环境变量“DEEPSEEK_API_KEY”。请把 <you_api_key> 替换成你自己的 api key。

import os
os.environ['DEEPSEEK_API_KEY'] = <you_api_key>

然后,再从环境变量中读取 api key。

deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")

deepseek 使用与 OpenAI 兼容的 API 格式,我们可以使用 OpenAI SDK 来访问 DeepSeek API。

# 安装 openai 库
pip install openai

接下来创建 openai 客户端实例。

# 导入openai库
from openai import OpenAI

# 导入os库
import os

# 创建openai客户端实例
OpenAI_client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com")

根据 deepseek api 文档的说明,定义生成响应的函数 generate_response。model 是我们使用的大模型,这里是 deepseek-chattemperature 决定大模型回答的随机性,数值在0-2之间,数值越高,生成的文本越随机;值越低,生成的文本越确定。

# 定义生成响应的函数
def generate_response(
        system_prompt, 
        user_prompt, 
        model, 
        temperature
    ):
    # 大模型的响应
    response = OpenAI_client.chat.completions.create(
        model=model,
        messages=[
            # 设置系统信息,通常用于设置模型的行为、角色或上下文。
            {"role""system""content": system_prompt},
            # 设置用户消息,用户消息是用户发送给模型的消息。
            {"role""user""content": user_prompt},
        ],
        # 设置温度
        temperature=temperature,  
        stream=True
    )
    # 遍历响应中的每个块
    for chunk in response:
        # 检查块中是否包含选择项
        if chunk.choices:
            # 打印选择项中的第一个选项的增量内容,并确保立即刷新输出
            print(chunk.choices[0].delta.content, end="", flush=True)

响应函数接收的参数中,system_prompt 是系统提示词,主要用于设置模型的行为、角色或上下文。你可以理解为这是系统给大模型的提示词,而且始终有效。我们可以使用下面的提示词规范大模型的响应:

system_prompt = "你是鲁迅作品研究者,熟悉鲁迅的各种作品。"

user_prompt 是用户提示词,是用户发给大模型的。大模型会在系统提示词和用户提示词的共同作用下,生成响应。用户提示词由查询句子 query 和向量数据库搜索到的句子组成。对于 fixed_chunksemantic_chunk,我们需要获取 chunk 字段的值。对于 sentence_window,我们需要获取 window 字段的值。定义下面的函数可以帮助我们方便获取想要的值。

def get_ref_info (query, search_params, limit, output_fields, method):
    res = vector_search (query, search_params, limit, output_fields, method)
    for hit in res[0]:
        ref_info = {
            "ref": hit["entity"]["window"if method == "sentence_window" else hit["entity"]["chunk"],
            "title": hit["entity"]["title"]
        }
    return ref_info

最后,针对不同的分块方法,获取对应的响应。

for method in chunk_methods:
    print(f"分块方法: {method}")
    # 获取参考信息
    ref_info = get_ref_info(query, search_params, limit, output_fields, method)
    # 生成用户提示词
    user_prompt = (
        f"请你根据提供的参考信息,查找是否有与问题语义相似的内容。参考信息:{ref_info}。问题:{query}。\n"
        f"如果找到了相似的内容,请回复“鲁迅的确说过类似的话,原文是[原文内容],这句话来自[文章标题]”。\n"
        f"[原文内容]是参考信息中ref字段的值,[文章标题]是参考信息中title字段的值。如果引用它们,请引用完整的内容。\n"
        f"如果参考信息没有提供和问题相关的内容,请回答“据我所知,鲁迅并没有说过类似的话。”"
)
    # 生成响应
    generate_response(system_prompt, user_prompt, model, temperature)
    print("\n" + "*" * 50 + "\n")

好啦,一切准备就绪,让我们看看使用不同分块方法的 RAG,究竟有什么区别。先看第一句话,“世上本没有路,走的人多了,也便成了路。”

分块方法: fixed_chunk
鲁迅的确说过类似的话,原文是“的人多了,也便成了路。 一九二一年一月。”,这句话来自《故乡》。
**************************************************

分块方法: semantic_chunk
鲁迅的确说过类似的话,原文是“跨过了灭亡的人们向前进。什么是路?就是从没路的地方践踏出来的,从只有荆棘的地方开辟出来的。以前早有路了,以后也该永远有路。人类总不会寂寞,因为生命是进步的,是乐天的。昨天,我对我的朋友L说,“一个人死了,在死者自身和他的眷属是悲惨的事,”这句话来自《六十六生命的路》。
**************************************************

分块方法: sentence_window
鲁迅的确说过类似的话,原文是“我想:希望本是无所谓有,无所谓无的。 这正如地上的路;其实地上本没有路,走的人多了,也便成了路。 一九二一年一月。”,这句话来自《故乡》。

fixed_chunk 选手虽然给出了原文,但是遗憾的是不够完整。semantic_chunk 选手没有搜索到原文,但是给出的句子语义也和原文类似,算是意外收获。而 sentence_window 选手,则给出了标准答案。

再来看看第二句,“我家墙外有两株树,一株是枣树,还有一株也是枣树。”

分块方法: fixed_chunk
鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,似乎自以为大有深意,而将繁霜洒在我的园里的野花草上。我不知”,这句话来自《秋夜》。
**************************************************

分块方法: semantic_chunk
鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。他仿佛要离开人间而去,使人们仰面不再看见。然而现在却非常之蓝,闪闪地䀹着几十个星星的眼,冷眼。他的口角上现出微笑,”,这句话来自《秋夜》。
**************************************************

分块方法: sentence_window
鲁迅的确说过类似的话,原文是“在我的后园,可以看见墙外有两株树,一株是枣树,还有一株也是枣树。 这上面的夜的天空,奇怪而高,我生平没有见过这样的奇怪而高的天空。”,这句话来自《秋夜》。
**************************************************

三位选手表现差不多,sentence_window 选手给出的原文更精准。

最后来看看第三句,“猛兽总是独行,牛羊才成群结对。”

分块方法: fixed_chunk
鲁迅的确说过类似的话,原文是“兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。”,这句话来自《春末闲谈》。
**************************************************

分块方法: semantic_chunk
据我所知,鲁迅并没有说过类似的话。
**************************************************

分块方法: sentence_window
鲁迅的确说过类似的话,原文是“猛兽是单独的,牛羊则结队;野牛的大队,就会排角成城以御强敌了,但拉开一匹,定只能牟牟地叫。”,这句话来自《春末闲谈》。
**************************************************

fixed_chunk 选手虽然搜索结果包含了无关内容,但是大模型从中筛选出了合适的句子。semantic_chunk 选手搜索到的句子并没有被大模型采纳。sentence_window 选手仍然不负众望,给出了标准答案。请为 sentence_window 选手的精彩表现鼓掌。

07.

探索

其实,RAG 的响应和很多因素相关,你可以多多尝试,看看结果有什么不同。比如,修改 vector_search 函数的 limit 参数,让向量数据库多返回几个句子,增加命中概率。或者增加 generate_response 函数的 temperature 参数,看看 RAG 的响应如何变化。还有提示词,它直接影响大模型如何回答。

另外,你还可以基于本应用,开发其他功能,比如鲁迅作品智能问答功能,解答关于鲁迅作品的问题。或者鲁迅作品推荐功能,输入你想要阅读的作品类型,让 RAG 为你做推荐。玩法多多,祝你玩得开心。

08.

藏宝图

老规矩,推荐一些资料供你参考。

  • ChunkViz 是一个在线网站,提供分块可视化功能。(https://chunkviz.up.railway.app/)

  • 想了解 RAG 更多有趣应用,可以看看这个视频:《当我开发出史料检索RAG应用,正史怪又该如何应对?》(https://www.bilibili.com/video/BV1da4y1k78p/?spm_id_from=333.337.search-card.all.click&vd_source=ad92e3138da83a643ab3f5883c7664c7)。想了解更多技术细节,看这里:《揭秘「 B 站最火的 RAG 应用」是如何炼成的》(https://zilliz.com.cn/blog/milvus-rag-bilibili)

  • 想了解更多分块技术,可以阅读《检索增强生成(RAG)的分块策略指南》(https://zilliz.com.cn/blog/guide-to-chunking-sreategies-for-rag)和《从固定大小到NLP分块 - 文本分块技术的深入研究》(https://safjan.com/from-fixed-size-to-nlp-chunking-a-deep-dive-into-text-chunking-techniques/)两篇文章。

参考

[^1]: 鲁迅作品集数据基于 luxun_dataset (https://github.com/sun510001/luxun_dataset),增加了一些字段。luxun_sample.json 为鲁迅部分作品,方便试用。luxun.json 为完整的鲁迅作品集。

[^2]: LlamaIndex 是一个用于构建带有上下文增强功能的生成式 AI 应用的框架,支持大型语言模型(LLMs)。

代码文件

链接: https://pan.baidu.com/s/16nSrOh7jM0c6naB0JgsUhQ?pwd=1234 提取码: 1234

作者介绍

Zilliz 黄金写手:江浩


推荐阅读


Zilliz
Simply The Fastest Vector Database for AI. Period.
 最新文章