手把手系列 | 无需 OpenAI 即可搭建 RAG 应用(二)

文摘   2024-08-22 18:30   上海  
ChatGPT 在 2023 年将 AI 这一火热概念推至新高度。而如今开发人员有了更多的选择,不再局限于 OpenAI。

在本系列的前两篇文章内我们介绍了如何通过 OpenAI 之外的大语言模型(LLM)来构建检索增强生成(RAG)应用。本文将使用 BentoML 、OctoAI、Milvus 向量数据库搭建 RAG 应用。

01

使用BentoML生成Embedding向量


我们可以使用 BentoML 的Sentence Transformers Embeddings将句子转换为 Embedding 向量。在 BentoML 的 GitHub 代码仓库中可以看到 service.py 文件。这个文件主要用于启动服务器并设置 API Endpoint。通过 API Endpoint,能够加载 HuggingFace 的 all-MiniLM-L6-v2 模型,利用这个模型来生成向量。


我们需要先导入 bentoml 并使用 SyncHTTPClient 原生对象类型启动一个 HTTP 客户端。


import bentoml
bento_client = bentoml.SyncHTTPClient("http://localhost:3000")
连接到客户端后,需要创建一个函数,从字符串列表中获取 Embedding 列表。示例中我们一次性处理 25 个字符串,理由是分割字符串列表可以降低调用模型的成本,并避免超时。
然后,我们调用上面创建的 bento_client 来 encode 这些句子。BentoML 客户端返回一个向量列表。我们处理这些向量并将其添加到自己创建的空向量列表中。在这个loop的最后,我们返回最终的向量列表。
如果文本列表中的字符串不超过 25 个,我们只需调用客户端的 encode 方法对传入的字符串列表进行处理。
def get_embeddings(texts: list) -> list: if len(texts) > 25: splits = [texts[x:x+25] for x in range(0, len(texts), 25)] embeddings = [] for split in splits: embedding_split = bento_client.encode( sentences = split ) for embedding in embedding_split: embeddings.append(embedding) return embeddings return bento_client.encode( sentences=texts, )


02

将数据插入到用于 RAG 向量数据库中


现在,我们可以将数据插入到 Milvus 中。首先,需要启动并连接至 Milvus。在上方 GitHub 仓库链接中下载 docker-compose.yml 文件,您也可以在文档中获取 Milvus Docker Compose。


如果您已安装 Docker 并下载了该仓库,您可以直接运行 docker compose up -d 来启动 Milvus 服务。随后,导入 connections 模块并调用 connect,使用主机(localhost 或 127.0.0.1)和端口(19530)。

以下代码中还定义了两个常量: Collection 名称和向量维度。您可根据自己的偏好命名 Collection。但向量维度应与 Embedding 模型 all-MiniLM-L6-v2 生成的向量维度保持一致。

from pymilvus import connectionsCOLLECTION_NAME = "bmo_test"DIMENSION = 384connections.connect(host="localhost", port=19530)


03

创建 Milvus Collection


在 Milvus 中创建 Collection 主要分为两个步骤:定义 Schema 和创建索引。这个过程中需要导入 4 个模块:

  • FieldSchema :用于定义字段
  • CollectionSchema :用于定义 Collection Schema
  • DataType :用于定义字段数据类型
  • Collection :是 Milvus 用来创建 Collection 的对象。
您可以仔细设置Collection 中每个字段的 Schema,或者您也可以仅设置两个必要字段 id 和 embedding,然后通过 enabled_dynamic_field 开启动态列,后续可以动态根据数据结构插入未预先定义的字段。使用动态列,我们就能够像处理 NoSQL 数据库(如 MongoDB)那样处理插入至 Milvus 的数据。
接下来,我们只需使用之前给定的 Collection 名称和 Schema 创建 Collection。
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection

# id and embedding are required to definefields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)]# "enable_dynamic_field" lets us insert data with any metadata fieldsschema = CollectionSchema(fields=fields, enable_dynamic_field=True)# define the collection name and pass the schemacollection = Collection(name=COLLECTION_NAME, schema=schema)
Collection 创建完成后,需要创建索引。搜索时,索引定义了如何映射数据以便检索。在本文中,我们使用 HSNW来构建索引。我们还需要定义用于测量向量距离的相似度类型。本文中,我们使用内积(IP)。
Milvus 提供的 11 种索引类型,每种都包含不同的参数设置。对于 HNSW,我们有两个参数需要调整:“M”和“efConstruction”。"M" 是每个图中节点的上限,"efConstruction" 是在索引构建过程中使用的探索因子。
从实用角度来看,"M" 和 "efConstruction" 越高,搜索性能越好。但更高的 "M" 值意味着索引将占用更多内存。更高的 "efConstruction" 则意味着构建索引所需的时间将更长。我们推荐您通过实验找到最佳平衡值。
定义索引后,我们在选择的字段上创建索引——在本例中是 embedding 字段。然后我们调用 load 方法将 Collection 加载到内存中。
index_params = { "index_type": "HNSW", # one of 11 Milvus indexes "metric_type": "IP", # L2, Cosine, or IP "params": { "M": 8, # higher M = consumes more memory but better search quality "efConstruction": 64 # higher efConstruction = slower build, better search },}# pass the field to index on and the parameters to index withcollection.create_index(field_name="embedding", index_params=index_params)# load the collection into memorycollection.load()

将原始数据解析和转换为 Embedding 向量并插入至 Milvus。首先,需要准备好要插入的数据。本文中,我们有一堆 txt 文件存储在 GitHub 仓库的数据文件夹中。我们将这些数据分块,转换为 Embedding 向量,并存储在 Milvus 中。


让我们创建一个函数来将这些文本分块。主流分块方法众多,本例中我们选取最简单的方法。以下代码用于接受文件,将其作为字符串传入,然后在文本换行处进行分割,最终返回一个全新的字符串列表。

# naively chunk on newlinesdef chunk_text(filename: str) -> list: with open(filename, "r") as f: text = f.read() sentences = text.split("\n") return sentences

接下来,需要处理每个文件。我们获取所有文件名的列表,并创建一个空列表来保存分块信息。然后,我们遍历所有文件,并对每个文件运行上述函数,以对每个文件进行简单的分块处理。
在我们存储分块之前,需要先清理数据。分块结果中会有许多空行,因此我们需要创建一个空列表,并只存储长度超过一定字符数的分块。为了简单起见,我们将这个长度设置为 7 个字符。
清理数据完成后就可以将数据插入到 Milvus 中。我们需要创建一个字典,将每个分块列表与文档名称进行映射——本例以城市名称为例。然后,我们将这些内容添加到上面创建的空列表中。
import oscities = os.listdir("data")# store chunked text for each of the cities in a list of dictscity_chunks = []for city in cities: chunked = chunk_text(f"data/{city}") cleaned = [] for chunk in chunked: if len(chunk) > 7: cleaned.append(chunk) mapped = { "city_name": city.split(".")[0], "chunks": cleaned } city_chunks.append(mapped)
我们通过在每个分块列表上直接调用 get_embeddings 函数来实现这一点。现在,我们需要将它们匹配起来。由于 Embedding 向量列表和句子列表应该通过索引进行匹配,我们可以通过enumerate任一列表来进行匹配。
我们通过创建一个字典来匹配它们,该字典代表一个插入到 Milvus 的数据行。每行数据包含 Embedding 向量、相关句子和城市名称信息。包括城市名称为可选操作。需要注意的是,Entity 中无需包括 id 字段。这是因为我们在上面创建 schema 时开启了 Auto ID。
我们在循环过程中将每个条目添加到列表中。最后,我们得到了一个字典列表,每个字典代表 Milvus 中的单行条目。然后我们可以简单地将这些条目插入到我们的 Milvus Collection中。
最后一步是调用flush以便我创建索引。
entries = []for city_dict in city_chunks: embedding_list = get_embeddings(city_dict["chunks"]) # returns a list of lists # now match texts with embeddings and city name for i, embedding in enumerate(embedding_list): entry = {"embedding": embedding, "sentence": city_dict["chunks"][i], # poorly named cuz it's really a bunch of sentences, but meh "city": city_dict["city_name"]} entries.append(entry)collection.insert(entries)collection.flush()


04

设置LLM


您需要先创建一个 OctoAI 账号。当然,您也可以选择其他 LLM 替代 OctoAI。
我们先在代码中加载环境变量,获取 OctoAI API Token,并启动 OctoAI 服务。
from dotenv import load_dotenvload_dotenv()os.environ["OCTOAI_TOKEN"] = os.getenv("OCTOAI_API_TOKEN")from octoai.client import Clientocto_client = Client()


05‍

给 LLM 下指令


LLM 需要问题和上下文才能进行 RAG 任务。我们可以通过创建一个函数同时传递这两个参数,该函数接收两个字符串:问题和上下文。

本例中,我们使用 Nous Research 精调过的 Mixtral 模型。
我们给这个模型两个“消息”,指示。首先,我们给 LLM 发送一条消息,告诉它仅根据给定的上下文回答用户的问题。接下来,我们告诉它将有一个用户,并简单地传入问题。
其他参数用于调整模型行为。我们可以控制最大 token 数和模型的“创造性”程度。
然后,函数将从客户端返回 JSON 格式的输出。
def dorag(question: str, context: str): completion = octo_client.chat.completions.create( messages=[ {"role": "system","content": f"You are a helpful assistant. The user has a question. Answer the user question based only on the context: {context}" }, {"role": "user","content": f"{question}" } ], model="nous-hermes-2-mixtral-8x7b-dpo", max_tokens=512, presence_penalty=0, temperature=0.1, top_p=0.9, )return completion.model_dump()


06‍

RAG 示例


我们创建函数接收问题,然后使用 RAG 来回答问题。

我们使用同样的 Embedding 模型将问题转换为 Embedding 向量。然后使用这个向量在Milvus 中进行相似性搜索。
注意我们以列表格式将问题传递给 get_embeddings 函数,然后直接将输出的列表传递到我们的 Milvus 搜索的data部分。
在我们的搜索调用中,我们还需要提供一些其他参数。anns_field 用于定义在哪个字段上进行近似最近邻搜索(ANNS)。
我们还需要传入一些索引参数。确保相似度类型与我们用来创建索引的类型匹配,即 IP。
我们还必须使用匹配的索引参数——在本例中,是 ef。ef 越大,搜索时间越长,但召回率更高。 ef 最大值支持 2048。但本例中,我们将为了更快的搜索,将参数值设置为 16 。这个数据集中只有几千条数据。
接下来,我们还传递了一个 limit 参数,用于限制从 Milvus 返回的结果个数。本例中,limit=5。
最后一个搜索参数定义了我们想在搜索结果中返回哪些字段。本例中,我们只需要sentence。
我们需要进一步处理 Milvus 返回的搜索结果。Mivus 返回的 Entity 中包含 hits,所以我们要抓取 5 个 hits 中的句子 sentence,然后使用句号拼接形成一个段落。
接着,我们将用户问题和段落一同传入 dorag 函数,从而生成最终响应回答。
def ask_a_question(question): embeddings = get_embeddings([question]) res = collection.search( data=embeddings, # search for the one (1) embedding returned as a list of lists anns_field="embedding", # Search across embeddings param={"metric_type": "IP","params": {"ef": 16}}, limit = 5, # get me the top 5 results output_fields=["sentence"] # get the sentence/chunk and city ) sentences = []for hits in res:for hit in hits: sentences.append(hit.entity.get("sentence")) context = ". ".join(sentences)return dorag(question, context)print(ask_a_question("What state is Cambridge in?")["choices"][0]["message"]["content"])
我们简单提问“剑桥属于哪个州?”回答是“位于马塞诸塞州”。
推荐阅读







Zilliz
Simply The Fastest Vector Database for AI. Period.
 最新文章