在本系列的前两篇文章内我们介绍了如何通过 OpenAI 之外的大语言模型(LLM)来构建检索增强生成(RAG)应用。本文将使用 BentoML 、OctoAI、Milvus 向量数据库搭建 RAG 应用。
01
使用BentoML生成Embedding向量
我们可以使用 BentoML 的Sentence Transformers Embeddings将句子转换为 Embedding 向量。在 BentoML 的 GitHub 代码仓库中可以看到 service.py 文件。这个文件主要用于启动服务器并设置 API Endpoint。通过 API Endpoint,能够加载 HuggingFace 的 all-MiniLM-L6-v2 模型,利用这个模型来生成向量。
我们需要先导入 bentoml 并使用 SyncHTTPClient 原生对象类型启动一个 HTTP 客户端。
import bentoml
bento_client = bentoml.SyncHTTPClient("http://localhost:3000")
def get_embeddings(texts: list) -> list: if len(texts) > 25: splits = [texts[x:x+25] for x in range(0, len(texts), 25)] embeddings = [] for split in splits: embedding_split = bento_client.encode( sentences = split ) for embedding in embedding_split: embeddings.append(embedding) return embeddings return bento_client.encode( sentences=texts, )
02
将数据插入到用于 RAG 向量数据库中
现在,我们可以将数据插入到 Milvus 中。首先,需要启动并连接至 Milvus。在上方 GitHub 仓库链接中下载 docker-compose.yml 文件,您也可以在文档中获取 Milvus Docker Compose。
如果您已安装 Docker 并下载了该仓库,您可以直接运行 docker compose up -d 来启动 Milvus 服务。随后,导入 connections 模块并调用 connect,使用主机(localhost 或 127.0.0.1)和端口(19530)。
以下代码中还定义了两个常量: Collection 名称和向量维度。您可根据自己的偏好命名 Collection。但向量维度应与 Embedding 模型 all-MiniLM-L6-v2 生成的向量维度保持一致。
from pymilvus import connectionsCOLLECTION_NAME = "bmo_test"DIMENSION = 384connections.connect(host="localhost", port=19530)
03
创建 Milvus Collection
在 Milvus 中创建 Collection 主要分为两个步骤:定义 Schema 和创建索引。这个过程中需要导入 4 个模块:
FieldSchema :用于定义字段 CollectionSchema :用于定义 Collection Schema DataType :用于定义字段数据类型 Collection :是 Milvus 用来创建 Collection 的对象。
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection
# id and embedding are required to define
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
]
# "enable_dynamic_field" lets us insert data with any metadata fields
schema = CollectionSchema(fields=fields, enable_dynamic_field=True)
# define the collection name and pass the schema
collection = Collection(name=COLLECTION_NAME, schema=schema)
index_params = {
"index_type": "HNSW", # one of 11 Milvus indexes
"metric_type": "IP", # L2, Cosine, or IP
"params": {
"M": 8, # higher M = consumes more memory but better search quality
"efConstruction": 64 # higher efConstruction = slower build, better search
},
}
# pass the field to index on and the parameters to index with
collection.create_index(field_name="embedding", index_params=index_params)
# load the collection into memory
collection.load()
将原始数据解析和转换为 Embedding 向量并插入至 Milvus。首先,需要准备好要插入的数据。本文中,我们有一堆 txt 文件存储在 GitHub 仓库的数据文件夹中。我们将这些数据分块,转换为 Embedding 向量,并存储在 Milvus 中。
让我们创建一个函数来将这些文本分块。主流分块方法众多,本例中我们选取最简单的方法。以下代码用于接受文件,将其作为字符串传入,然后在文本换行处进行分割,最终返回一个全新的字符串列表。
# naively chunk on newlines
def chunk_text(filename: str) -> list:
with open(filename, "r") as f:
text = f.read()
sentences = text.split("\n")
return sentences
import os
cities = os.listdir("data")
# store chunked text for each of the cities in a list of dicts
city_chunks = []
for city in cities:
chunked = chunk_text(f"data/{city}")
cleaned = []
for chunk in chunked:
if len(chunk) > 7:
cleaned.append(chunk)
mapped = {
"city_name": city.split(".")[0],
"chunks": cleaned
}
city_chunks.append(mapped)
entries = []
for city_dict in city_chunks:
embedding_list = get_embeddings(city_dict["chunks"]) # returns a list of lists
# now match texts with embeddings and city name
for i, embedding in enumerate(embedding_list):
entry = {"embedding": embedding,
"sentence": city_dict["chunks"][i], # poorly named cuz it's really a bunch of sentences, but meh
"city": city_dict["city_name"]}
entries.append(entry)
collection.insert(entries)
collection.flush()
04
设置LLM
from dotenv import load_dotenvload_dotenv()os.environ["OCTOAI_TOKEN"] = os.getenv("OCTOAI_API_TOKEN")from octoai.client import Clientocto_client = Client()
05
给 LLM 下指令
LLM 需要问题和上下文才能进行 RAG 任务。我们可以通过创建一个函数同时传递这两个参数,该函数接收两个字符串:问题和上下文。
def dorag(question: str, context: str): completion = octo_client.chat.completions.create( messages=[ {"role": "system","content": f"You are a helpful assistant. The user has a question. Answer the user question based only on the context: {context}" }, {"role": "user","content": f"{question}" } ], model="nous-hermes-2-mixtral-8x7b-dpo", max_tokens=512, presence_penalty=0, temperature=0.1, top_p=0.9, )return completion.model_dump()
06
RAG 示例
我们创建函数接收问题,然后使用 RAG 来回答问题。
def ask_a_question(question): embeddings = get_embeddings([question]) res = collection.search( data=embeddings, # search for the one (1) embedding returned as a list of lists anns_field="embedding", # Search across embeddings param={"metric_type": "IP","params": {"ef": 16}}, limit = 5, # get me the top 5 results output_fields=["sentence"] # get the sentence/chunk and city ) sentences = []for hits in res:for hit in hits: sentences.append(hit.entity.get("sentence")) context = ". ".join(sentences)return dorag(question, context)print(ask_a_question("What state is Cambridge in?")["choices"][0]["message"]["content"])