如何在生成式AI里使用 Ray Data 进行大规模 RAG 应用的 Embedding Inference

文摘   2024-08-06 18:30   上海  



检索增强生成 (RAG,即Retrieval Augmented Generation) 是企业级生成式 AI(GenAI)应用的热门案例之一。多数 RAG 教程演示了如何利用 OpenAI API 结合 Embedding 模型和大语言模型(LLM)来进行推理(Inference)。然而,在开发过程中,如果能使用开源工具,就可以免去访问自己数据的费用,同时也能加快迭代。


在 Embedding 步骤(即将数据转换为向量的过程)中,使用 Ray Data 取得的性能提升尤为显著。相比于使用 Pandas,采用 Ray Data 等工具对批量推理请求进行汇总处理可以显著节省资源和处理时间。例如,在一台配备 16GB RAM 的 Mac M2 笔记本电脑上,仅使用四个 worker node,Ray Data 的处理速度就比 Pandas 快了 60 倍!本文将详细介绍使用 Milvus + Ray Data 进行 Embedding inference 的性能。


我们的开源 RAG 技术栈包括:

  • BGM-M3 Embedding 模型:该模型能一次性生成三种类型的向量,包括稀疏向量、稠密向量和多向量。

  • Ray Data:高效的分布式 Embedding inference 推理工具。

  • AWS S3:用于临时存储推理结果。

  • Milvus 或 Zilliz Cloud:用作向量数据库。


示例数据来源:Kaggle 的 IMDB 海报数据集。


01.

开源RAG技术栈


BGM-M3 Embedding 模型


BGM-M3 Embedding 模型是一种强大的多功能 Embedding 工具,其特点在于能够处理多语言(Multi-Linguality)、多功能(Multi-Functionality)和多粒度(Multi-Granularity)的数据,因此得名“M3”。该模型支持超过 100 种语言,并能计算三种常见的 Embedding 类型:稀疏向量、稠密向量和多向量。它还可以处理各种长度的文本——从短句到长文档,最多可支持 8192 个 token。更多详情,请参考论文或通过 HugginFace 网站了解此模型。(https://arxiv.org/abs/2402.03216)


Milvus 2.4 版本现已集成 BGM-M3 Embedding 模型。


Ray Data


有长期运行的数据转换任务?


Ray Data 具备可扩展的数据处理能力,能够便捷和快速地在多台机器(CPU、GPU 等)上并行处理大量数据。Ray Data 尤其适用于数据可以被分割,能够并行处理的场景,例如同时进行的多个数据块切分和 Embedding 转换的任务。Ray Data 底层是一个强大的流式执行引擎,能够最大化集群中的 GPU 利用率。与使用在线服务(如 OpenAI 嵌入 API)运行嵌入相比,使用 Ray Data 进行离线嵌入作业可以节省大部分成本。


Anyscale 是 Ray 的托管平台。您可以轻松地在 Anyscale 上利用 GPU 机器扩展 Embedding 任务。


Milvus 和 Zilliz Cloud


RAG应用之所以能够迅速响应,关键在于其背后强大的向量数据库。Milvus 专为处理大规模业务和海量数据设计。与其他向量数据库不同,Milvus 能够根据数据量的增长灵活地进行扩展。Milvus 的存储、索引和查询是独立的,可以单独垂直扩展或水平扩展,这一设计让 RAG 应用能够迅速响应用户的实时查询需求,同时在收到查询前和收到查询时,Milvus 能够智能地进行离线计算。除了强大的性能外,Milvus 还提供了多种企业级重要特性,如多租户和基于角色的访问控制(RBAC)、高可用等。


Zilliz Cloud 基于开源的 Milvus 搭建的全托管向量数据库云服务。


02.

设置RAG工具


本教程中将使用 Milvus 的 Python SDK、Ray Data、Amazon S3 和 Zilliz Cloud。


开始前,请先注册 AWS 账号以使用 Amazon S3。然后前往 console.aws.amazon.com > IAM > My security credentials > Create access key。复制并保存密钥(Access key 和 secret key)。


安装所需的工具库并运行 aws config。请在文件中输入 AWS 密码。

pip install boto3 pip install awscli –force-reinstall –upgradeaws config #fill in your key and secret keymore ~/.aws/credentials #make sure this looks correct


安装 Ray Data。

pip install -U "ray[data]"


安装 Pymilvus。

pip install -U pymilvus "pymilvus[model]" langchain


PyMilvus 2.4 版本及以上已打包 BGE-M3 embedding 模型。

import ray, os, pprint, time, boto3from langchain.text_splitter import RecursiveCharacterTextSplitterimport numpy as npimport pymilvusprint(pymilvus.__version__) # must be >= 2.4.0from pymilvus.model.hybrid import BGEM3EmbeddingFunction


如需使用 Zilliz,请先注册账号并创建集群。


03.

准备数据


本文使用了 Kaggle IMDB 海报数据作为数据集,其中包含大约 48,000 部电影、影评、海报链接以及一些元数据。


我们将所有文本字段(电影名称、描述、影评文本)复制到名为‘text’的一列中,并将其保存为 Parquet 格式,因为这种文件格式比 CSV 更高效。


04.

生成Embedding模型


根据以下步骤生成 Embedding 向量:

  1. 切分数据: 将输入文本切分为片段,将语义上相关的文本片段保存在一起。

  2. 调用推理(inference)模式下的 Embedding 模型,用于生成文本片段的 Embedding 向量。


Ray Data 可以并行处理以下数据操作请求:

  1. flat_map():切分数据(输出的行数将多于输入行数)。

  2. map_batches():调用 Embedding 模型。

chunk_size = 512chunk_overlap = np.round(chunk_size * 0.10, 0)
# Define a LangChain text splitter.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len) #len is a built-in Python function
# 1. Define a regular python function for chunking.def chunk_row(row, splitter=text_splitter):
# Copy the row columns into metadata. metadata = row.copy() del metadata['text'] # Remove text from metadata
# Split the text into chunks. chunks = splitter.create_documents( texts=[row["text"]], metadatas=[metadata]) chunk_list = [{ "text": chunk.page_content, **chunk.metadata} for chunk in chunks]
return chunk_list
# 2. Define a class with a callable method to compute embeddings.class ComputeEmbeddings: def __init__(self): # Initialize a Milvus built-in sparse-dense-late-interaction-reranking encoder. # https://huggingface.co/BAAI/bge-m3 self.model = BGEM3EmbeddingFunction(use_fp16=False, device="cpu") print(f"dense_dim: {self.model.dim['dense']}") print(f"sparse_dim: {self.model.dim['sparse']}")
def __call__(self, batch):
# Ray data batch is a dictionary where values are array values. # BGEM3EmbeddingFunction input is docs as a list of strings. docs = list(batch['text'])
# Encode the documents. bge-m3 dense embeddings are already normalized. embeddings = self.model(docs) batch['vector_dense'] = embeddings['dense'] return batch
if __name__ == "__main__":
FILE_PATH = "s3://zilliz/kaggle_imdb.parquet"
# Load and transform data.
ds = ray.data.read_parquet(FILE_PATH)
# Chunk the input text chunked_ds = ds.flat_map(chunk_row)
# Compute embeddings with a class that calls the embeddings model. embeddings_ds = chunked_ds.map_batches(ComputeEmbeddings, concurrency=4)
# Save the embeddings to S3 in a folder of parquet part files.   embeddings_ds.write_parquet('s3://zilliz/kaggle_imdb_embeddings')


运行前,需要提交 Ray 任务:

  1. 将代码保存至 Python 脚本文件。本例中将文件命名为 ray_data_demo.py。

  2. 在本地运行前,请先创建一个全新的路径,路径中只可包含 .py 脚本文件和 .parquet 数据文件。本例中将这个路径命名为ray_cluster。

  3. 运行 Python 脚本以启动 Ray 集群并自动提交任务。

  4. 打开 http://127.0.0.1:8265,查看集群和任务。



05.

Embedding速度提升60倍



表格展示了在 16GB M2 笔记本电脑上 Embedding 任务的用时。示例使用了 1 个单节点 Ray 集群用于批量处理任务,并发为 4。Pandas 用时更长,原因是 Pandas 只有一个处理器。但是 Ray Data 有 4 个处理器。Pandas 和 Ray在处理大型数据时性能都更出色。


06.

将数据从S3批量插入到Milvus或Zilliz Cloud之中


Milvus 和 Zilliz Cloud都支持批量插入(bulk insert)数据,可以直接从 AWS、GCP 或 Azure 对象存储中导入 Embedding 数据。除了界面外,Zilliz Cloud 还提供 RESTful API 和 SDK。


对于大量 Embedding 数据,使用 bulk insert 可以显著节省机器资源并缩短插入时间,与普通 insert 相比更为高效。此外,通过 bulk insert 构建的向量搜索索引比普通 insert 的索引更高效。


在 Zilliz Cloud 界面上仅需轻击几次鼠标便可轻松批量插入数据。首先,在集群中创建 1 个全新的 collection。创建时,需要开启 AutoID 和动态列,添加向量列,并设置向量列的向量维度。设置完毕后,点击“创建 Collection”。


接下来,点击“导入数据”并按照界面上的提示输入 parquet 文件的路径。(请注意,如果您的 S3 存储桶为非公开链接,您还需要输入 Access Key 和 Secret Key,以便 Zilliz Cloud 读取其中的数据)。Zilliz Cloud 支持 Amazon S3、Google Cloud Storage 或 Azure Blob Storage 等对象存储服务。点击“导入”开始将所有数据导入向量数据库 collection。


导入任务完成后,您可以选择在 collection 上构建索引,加速后续的向量搜索。



07.

查询数据


为测试新导入至 Collection 中的数据,让我们基于电影数据进行提问。

def mc_run_search(question, output_fields, top_k=2, filter_expression=""):     # Embed the question using the same encoder.   embeddings = model_bgem3([question])   query_embeddings = embeddings['dense']
# Run semantic vector search using your query and the vector database. results = mc.search( COLLECTION_NAME, data=query_embeddings, search_params=SEARCH_PARAMS, output_fields=output_fields, # Milvus can utilize metadata in boolean expressions to filter search. filter=filter_expression, limit=top_k, consistency_level="Eventually" )
# Assemble retrieved context and context metadata. # The search result is in the variable `results[0]`, which is type # 'pymilvus.orm.search.SearchResult'. METADATA_FIELDS = [f for f in output_fields if f != 'chunk'] formatted_results, context, context_metadata = _utils.client_assemble_retrieved_context( results, metadata_fields=METADATA_FIELDS, num_shot_answers=top_k) return formatted_results, context, context_metadata
SAMPLE_QUESTION = "muybridge horse movie"
# Return top k unique results with HNSW index.TOP_K = 2
# Define output fields to return.OUTPUT_FIELDS = ["movie_id", "chunk", "PosterLink"]
formatted_results, context, context_metadata = \ mc_run_search(SAMPLE_QUESTION, OUTPUT_FIELDS, TOP_K)


以下为查询结果:


完整的 Ray Data 脚本可在https://github.com/milvus-io/bootcamp/blob/master/bootcamp/Integration/ray_data_demo.py  获取。


08.

总结


文本介绍了如何使用 Ray Data 和 Milvus/Zilliz Cloud 的批量插入功能大幅加速向量生成和加载过程。使用 Milvus 能够高效构建索引、节省计算资源、提升向量搜索速度。


推荐阅读



Zilliz
Simply The Fastest Vector Database for AI. Period.
 最新文章