RisingWave 用户定义函数 (二): Rust x WebAssembly

文摘   科技   2024-06-27 15:10   北京  

作者: 王润基|RisingWave Labs 内核开发工程师

RisingWave 支持用户使用 Rust 语言编写自定义函数,Rust UDF 会被编译到 WebAssembly,一种最初诞生于浏览器之中的虚拟机汇编语言。随后,这些 UDF 会在 RisingWave 进程中内嵌的 WebAssembly 虚拟机上被即时编译(JIT)执行。这种执行方式效率很高,相比原生指令只有少量性能损失,同时避免了远程通信带来的高延迟。因此,Rust UDF 非常适合编写计算密集型逻辑在这篇文章中,我们将介绍 RisingWave Rust UDF 的设计与实现。

1应用场景

为什么要开发 Rust UDF 呢?事实上,RisingWave 首先支持的是 Python 语言的 UDF Server。Python 语言具有最广泛的用户群体,开发效率也很高。但是,随着越来越多的用户开始使用 Python UDF,我们也意识到它存在一些难以解决的痛点:

  • 最主要的是性能问题因为 Python 本身的动态性和解释执行的特性,它是所有主流编程语言中运行速度最慢的一个。稍微复杂一点的处理逻辑都会运行很长时间。

  • 其次,数据库和 UDF Server 通过远程调用(RPC)的方式传递数据,一来一回会引入相当大的延迟(通常在毫秒级)。从而拖慢整个数据流的响应速度。

  • UDF Server 虽然非常灵活,但是用户需要自己额外部署和运维这套服务。如果负载比较大,还需要自己做负载均衡。而它不受 RisingWave 的管理,向系统中引入了很多不可控因素。

其实,大部分用户使用 UDF 的目的仅仅是实现一些内置函数不支持的处理逻辑,不值得为此承担额外的运维压力和性能风险。因此,我们需要一个高性能的、在进程中就地执行的 UDF 方案。

能达到高性能要求的编程语言有 C/C++/Rust/Go 等。我们选择了 Rust,一方面是因为它是 RisingWave 本身的编程语言,另一方面也因为它在 Arrow 和 WebAssembly 中有着成熟的生态。

在进程中就地执行,还对运行环境的隔离性有很强要求。因为用户可以编写任意代码,但无论它怎么写都不能影响 RisingWave 本身的运行。这就要求运行环境具备一定的沙盒特性,能够限制不可信代码的 CPU 和内存使用,对外部的访问。因此我们不能对 UDF 编译出的指令原生运行,而需要将其编译到一种虚拟机指令,通过 JIT 方式运行。WebAssembly 就是一个非常合适的选择。很多 WebAssembly 解释器都是用 Rust 写的,它们可以非常方便地嵌入到 RisingWave 中。

因此,我们最终选择了 Rust 作为编程语言,将 WebAssembly 作为执行环境。

2用户接口

内嵌 Rust 代码

要在 RisingWave 中创建 Rust UDF,只需通过 create function 命令定义函数名和数据类型,然后嵌入一段有着相同签名的 Rust 函数即可。

create function gcd(intintreturns int language rust as $$
    fn gcd(mut a: i32, mut b: i32) -> i32 {
        while b != 0 {
            (a, b) = (b, a % b);
        }
        a
    }
$$;

对于表函数(Table Function),由于 Rust 的生成器(generator)特性尚未稳定,我们要求 Rust 函数返回一个迭代器(iterator),每次返回一行元素:

create function range(n intreturns table (x intlanguage rust as $$
    fn range(n: i32) -> impl Iterator<Item = i32> {
        (0..n).into_iter()
    }
$$;

这些 Rust 代码会在前端被编译成 WebAssembly 模块,然后在后端的运行时环境中执行。

上传 WebAssembly 模块

如果函数的实现比较复杂,比如需要依赖第三方库或者有多个文件、几行 SQL 写不下,用户也可以创建一个独立的 Rust 项目。通过我们提供的框架,自行编译出 WebAssembly 模块。最后直接把 WebAssembly 模块上传到 RisingWave 中运行。

比如上一篇中提到的使用 prost 库解析 protobuf 的场景:

// lib.rs
use arrow_udf::{function, types::StructType}; // 这是我们提供的 UDF 框架
use prost::{DecodeError, Message};            // 这是用户依赖的第三方库

// 导入从 .proto 生成的 Rust 代码
pub mod proto {
    include!(concat!(env!("OUT_DIR"), "/proto.rs"));
}

// 定义返回结构体
#[derive(StructType)]
struct DataKey {
    stream: String,
    pan: String,
}

// 定义解析函数
#[function("decode_proto(bytea) -> struct DataKey")]
fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {
    let data_key = proto::DataKey::decode(data)?;
    Ok(DataKey {
        stream: data_key.stream,
        pan: data_key.pan,
    })
}

最终以 base64 编码的方式将编译出的模块导入 RisingWave。

\set wasm_binary `base64 -i target/release/decode.wasm`
create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
language wasm using base64 :'wasm_binary';

注意这里的 language 已变成 wasm 而不是 rust。因为实际输入的是 wasm 指令。未来我们可能还会支持更多语言编译到 WebAssembly,而这些原始的编程语言是不会被 RisingWave 感知到的。

3内部实现

介绍完了用户接口,让我们来看看 RisingWave 背后都做了哪些事情。

前端编译

当接收到一条 create function ... language rust ... 语句时,RisingWave 前端会在本地临时文件夹中生成一个 Rust 项目。语句中内嵌的 Rust 代码片段会被提取出来,并补充上必要的辅助代码。

例如,对于如下函数:

create function gcd(intintreturns int language rust as $$
    fn gcd(mut a: i32, mut b: i32) -> i32 {
        while b != 0 {
            (a, b) = (b, a % b);
        }
        a
    }
$$;

前端会为它补充 use 语句和 SQL 签名,生成这样的 lib.rs 文件:

use arrow_udf::{function, types::StructType}; // import prelude

#[function("gcd(int, int) -> int")]
fn gcd(mut a: i32mut b: i32) -> i32 {
    while b != 0 {
        (a, b) = (b, a % b);
    }
    a
}

之后,前端会用 cargo release 模式编译出 .wasm 文件,并尝试通过 wasm-strip 去除其中的符号信息,降低二进制大小(通常可以降低到 1MB 左右)。

后面的工作和处理 language wasm 的情形一样。wasm 文件会被压缩后作为函数定义的一部分存储到 catalog 元数据中,等实际执行时再读取出来解压缩。

编译用户函数

那么这个函数到底是怎样被编译成可以实际执行的 WebAssembly 模块呢?

如果你曾看过我们之前介绍内置函数框架[1]的文章,就会发现它们之间存在非常相似的设计。因为它们都是用 Rust 实现 SQL 函数,没有本质区别。因此我们复用了同一套基于过程宏的代码生成框架,最终将用户函数包起来,生成基于 Apache Arrow 的列式求值函数:

fn gcd_eval(input: &RecordBatch) -> Result<RecordBatch> {...}

此时,如果它是内置函数的话,就已经能被 Rust 调用执行。但由于我们要编译到 WebAssembly,而 WebAssembly 解释器只能通过标准的 C ABI 来调用其中的函数(Rust 至今没有稳定的 ABI)。因此我们还需要做一些额外的工作,将其包装成可以被 FFI 调用的 C 函数。最终生成的代码长这个样子:

#[export_name = "arrowudf_Base64EncodedSignature"]
unsafe extern "C" fn gcd_int4_int4_int4_ffi(
  ptr: *const u8,
  len: usize,
  out: *mut arrow_udf::ffi::CSlice,
) -> i32 {
  // decode input RecordBatch from the buffer specified by `ptr` and `len`
  let input: RecordBatch = ...;
  
  // call Rust function
  let result = gcd_eval(&input);
  
  // encode output RecordBatch or error message to `out`
  match result {
    Ok(o) => { ...; 0 }
    Err(e) => { ...; -1 }
  }
}

其中 ptr len 表示一个输入 buffer,buffer 中包含了以文件格式编码的输入 RecordBatch这一编码是有标准格式的,因此可以作为 ABI 的一部分。相应地,out 是这个函数的返回值,表示一个由内部分配内存的 buffer,其中包含以同样格式编码的输出RecordBatch。这个 buffer 需要由调用者在读取完成后手动回收。因此 WebAssembly 模块还需要暴露自己的 alloc dealloc 函数。最后函数返回值是错误码,0 表示成功,-1 表示出错。如果出错,那么 out 指向的 buffer 中包含了错误信息以供读取。

比较有意思的一点设计是这个函数的符号名,#[export_name] 中的字符串。当 UDF 加载器拿到这个 .wasm 文件时,需要首先从某处读取所有 UDF 的元数据信息。元数据会告诉它这个模块包含哪些函数,每个函数的签名是什么,以及该从哪个位置调用。第一个和第三个问题很简单。因为每个 extern "C" fn 会在模块的导出符号表(symbol table)中出现。至于第二个问题,我们可以将函数签名(即 "gcd(int4, int4) -> int4")以字符串的形式编码在符号名中。但由于符号名中不能出现括号和空格等字符,我们还需要将字符串通过 base64 重新编码到合法字符,并加上 arrowudf_ 的固定前缀。这样 UDF 加载器只需要从符号表中过滤出包含这个前缀的符号,然后 base64 解码后面的签名即可。(注:这里用到的 base64 并非标准 base64,原因可参考代码[2]

如果用户函数返回 struct 类型,那么 struct 本身的定义将会编码到独立的符号中。例如这个函数:

#[derive(StructType)]
struct KeyValue<'a> {
    key: &'a str,
    value: &'a str,
}
#[function("key_value(string) -> struct KeyValue")]
fn key_value(kv: &str) -> Option<KeyValue<'_>> {
    let (key, value) = kv.split_once('=')?;
    Some(KeyValue { key, value })
}

过程宏会分别导出两个符号:(中括号内的部分会被 base64 编码)

arrowudf_[key_value(string)->struct KeyValue]
arrowudt_[KeyValue=key:string,value:string]

其中定义 struct 的符号会以 arrowudt_ 作为前缀。这个符号不指向任何实体,仅通过名称编码其 schema。加载器找到所有函数和类型符号后,即可恢复出完整的函数签名。

如果你已经编译出一个 WebAssembly UDF 模块,不妨通过以下命令来查看其中的导出符号:

wasm-objdump -x udf.wasm | grep arrowud

加载执行

最后到了数据库加载 WebAssembly 模块和执行其中函数的环节。我们使用 wasmtime[3] 作为运行时。它首先加载解压后的 WebAssembly 模块,然后依据上面约定好的协议扫描所有函数和类型。用户调用函数时,它根据函数名找到对应函数的入口点。通过模块提供的 alloc 函数在其中动态分配一段内存保存输入数据。然后调用函数本身,获取输出。最后通过相反的步骤解析输出,释放内存。

由于每个 WebAssembly 实例只能单线程运行,为了支持多线程并行执行,我们还额外维护了一个实例池。每个线程每次从池子中取出一个空闲实例调用函数,如果没有就创建一个新的,用完后再归还回来。这样即可保证按需实例化容器,避免占用过多内存。(每个实例最少要消耗几 MB 的内存)

4相关话题讨论

最后,我们来探讨一下上述基于 WebAssembly 实现 Rust UDF 方案中,大家可能关心的一些技术问题。

性能

Rust UDF 是否能实现真正的高性能?如果不套一层 WebAssembly 提供隔离性的话,理论上可以通过动态链接库加载,直接运行原生指令。这样的 UDF 和内置函数性能是完全一样的。如果用户拥有对 RisingWave 的完全控制权,并追求极致性能,这可能是最适合的方案。但代价是 UDF 可能直接导致 RisingWave 阻塞甚至崩溃。用户需自行承担风险。

如果使用 WebAssembly JIT 运行,预热后纯函数的执行时间大约是原生的 1.5-2x 左右。但是考虑到数据传输的额外开销(外部 Arrow 编码 - 复制到 WASM 内部内存 - WASM 内部解码),实际端到端执行时间可能会更长。根据我们的 benchmark 结果,对于 gcd 这种简单计算,WebAssembly 运行时间是原生的 10x 左右。

我认为数据传输这块还有优化空间。未来可以尝试通过将 host 内存映射到 wasm 内部的方式,直接零拷贝地传递 Arrow 数组。这样可以避免很多编解码和内存拷贝开销。

外部访问

目前,WebAssembly 模块仅支持纯计算操作,几乎无法进行任何外部访问,包括访问网络、文件系统等。因为 wasmtime 默认就是沙箱模式。但为了支持模块使用 std,必须为它接入 WASI(可以理解为 wasm 的系统调用)。这样即使 UDF panic,至少可以通过 stderr 打印错误信息。不过,我们依然限制了 stdout 和 stderr 的缓冲区容量,以避免攻击者耗尽主机内存。

理论上说,我们可以逐步开放各种接口,让模块受控地访问网络(例如指定白名单地址)。这样我们甚至可以用 UDF 来实现自定义 source(参考社区用户的尝试[4])。但是目前这些都没有实现,有待未来进一步探索。

5总结

本文讨论了 RisingWave 中 Rust UDF 的设计与实现。Rust UDF 通过编译到 WebAssembly 实现隔离性,同时保持高性能。我们复用了内部函数框架中的 #[function] 过程宏,向用户提供了非常简洁的实现接口。用户函数会在编译期被包装成 C 接口,通过编码后的 Arrow RecordBatch 与调用者交换数据。用户函数签名和类型定义被编码在符号中,加载器通过扫描符号表即可找到所有函数和类型。最终通过 wasmtime 完成对函数的 JIT 执行。

不过依然要说明的是,这个工作其实和 RisingWave 并没有强绑定关系。任何 Rust 编写的数据处理系统都可以引入 arrow_udf_wasm[5] 这个库来瞬间获得执行 Rust UDF 的能力。

参考资料
[1]

内置函数框架https://www.notion.so/Rust-SQL-42de7d4d491a4fa8876c6b04f961e7c2?pvs=21

[2]

代码https://github.com/risingwavelabs/arrow-udf/blob/601d277fbfe8b1980c870497a9928f0df52124a2/arrow-udf-macros/src/gen.rs#L642

[3]

wasmtimehttps://github.com/bytecodealliance/wasmtime

[4]

社区用户的尝试https://github.com/risingwavelabs/risingwave/issues/16223

[5]

arrow_udf_wasm https://docs.rs/arrow-udf-wasm/0.2.2/arrow_udf_wasm/

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章