作者: 王润基|RisingWave Labs 内核开发工程师
RisingWave 允许用户使用 Python 和 Java 语言编写自定义函数(UDF)。这些函数运行在独立的进程中,以 RPC 的方式向 RisingWave 提供服务。我们称这种形式的 UDF 为 “外部函数”(External Function)。
外部函数的特点是它具有天然的隔离性和灵活性:函数本身的运行不会影响到 RisingWave 本体。用户可以在进程中做任何他们想做的事情,也可以将这个进程灵活部署在任何地方。但是,这种方式也有它的问题,比如 RPC 延迟高、异常处理复杂,以及额外的运维负担等。
在这篇文章中,我们就来讨论 RisingWave 外部函数的实现。
1用户接口
首先还是来看用户接口。要在 RisingWave 中创建一个 Python 外部函数,用户需要首先安装 arrow-udf
包,然后在一个文件中定义并注册函数,最后运行这个文件作为 UDF Server。
# Import components from the arrow-udf module
from arrow_udf import udf, udtf, UdfServer
# Define a scalar function that returns a single value
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
while y != 0:
(x, y) = (y, x % y)
return x
# Define a table function over a Python generator function
@udtf(input_types='INT', result_types='INT')
def series(n):
for i in range(n):
yield i
# Start a UDF server
if __name__ == '__main__':
server = UdfServer(location="0.0.0.0:8815")
server.add_function(gcd)
server.add_function(series)
server.serve()
服务启动后,在 RisingWave 中创建函数,与 Python 服务器建立连接。
CREATE FUNCTION gcd(int, int) RETURNS int
USING LINK 'http://localhost:8815' AS gcd;
CREATE FUNCTION series(int) RETURNS TABLE (x int)
USING LINK 'http://localhost:8815' AS series;
之后就可以像任何内置函数一样调用 Python 函数了。
2内部实现
在内部实现中,我们采用 Arrow Flight RPC[1] 框架作为内核和服务端的通信框架,并基于 Flight RPC 接口设计了一套简单的 UDF 调用协议。选择 Arrow Flight 的理由是,它是一套与语言无关的高效数据交换协议,并且为常见语言都提供了 RPC 库的实现。这使得我们能够快速搭建原型系统,而不必关心具体的 RPC 实现细节,未来还能方便地扩展到更多语言上去。
Arrow Flight 提供了多种数据交换接口。我们主要使用其中两个:
GetFlightInfo
:获取给定资源的元数据信息,例如 Schema。我们用它实现对给定函数的查询功能,用于在创建函数时检查签名是否匹配。DoExchange
:一个双向流式调用,可以同时发送和接收 Arrow 数据。我们用它实现对函数的调用操作。对于标量函数,客户端发送一个包含函数参数的 Batch,收到一个相同行数的返回值 Batch 对于表值函数,则会收到若干不定长的 Batch,表示函数返回的 Table 对于聚合函数,则是连续发送要聚合的参数,最后收到一个聚合结果(我们暂时没有实现它)
为了保持对 UDF Server 的向后兼容性,每当这个调用协议发生变化,都需要更新协议的版本号。Server 需要暴露自己当前的协议版本,以便客户端能正确调用。因此我们后来还使用了 DoAction
传递版本信息。
虽然内部使用了 Arrow 进行存储和通信,但是用户最终编写函数时,使用的都是语言的原生类型。因此我们提供的 SDK 的主要职责就是隐藏 Arrow 数据格式和批处理的具体细节,将传过来的数据层层转换,最终调用到用户提供的原生函数上去。
此时,不同语言的特性会使得它们的接口和内部实现略有不同。
对于 Python 这类动态类型语言,我们需要用户标注好每个函数输入输出的 SQL 类型:
@udf(input_types=['VARCHAR[]', 'INT'], result_type='VARCHAR')
def array_index(array, i):
# ...
而对于 Java 这类静态类型语言,我们则可以通过反射从 Java 类型自动推断出 SQL 类型:
public class ArrayIndex implements ScalarFunction {
public String eval(String[] array, int i) {
// => array_index(varchar[], int) -> varchar
// ...
}
}
Python 语言由于其动态类型的特性,使用 pyarrow 时可以直接从原生对象的 List 生成 Arrow array;但是 Java 语言不支持动态类型,因此使用 Arrow Java 包时,还需要手动枚举各种类型并实现相应的构建 Array 操作;更进一步,如果语言的动态灵活性更低,像 Rust 一样都没有运行时反射,那么就只能通过过程宏这种编译期手段解决问题了。
3问题探讨
最后我们来讨论一下外部函数在实际应用中遇到的问题。
首先是性能问题。众所周知,Python 语言代码运行起来十分缓慢。一旦用户逻辑稍微复杂一点,每个函数运行时间就会很长。再加上 Arrow 的批处理特性,服务端需要为每一行依次调用函数,最后一起返回。这就使得一次 RPC 的延迟可能非常高。而 UDF 可能只是庞大流处理管线中的一环,但是这一环就会导致整个流处理的阻塞。为了避免阻塞,一个简单的方法是设置超时机制,即一旦函数没有按时返回就放弃等待,结果直接填 NULL。这种方法在批处理中问题不大,但在流处理中还可能引发后续问题,即两次的计算结果不一致(一次超时一次正常),进而引发后续结果的错乱。因此可行的办法还是努力降低延迟。
为了降低高负载下的延迟,我们需要扩展处理能力。不幸的是,Python 语言的 GIL 再次成为阻碍,使得我们无法通过多线程的方式充分利用多核算力。因而,我们只能手动启动多个服务进程,然后通过代理进行负载均衡。但是,由于 Arrow Flight 依赖 gRPC 依赖 HTTP2 的多路复用特性,如果没有合理配置的话,还可能出现负载不均衡的情况。另一方面,如果用户函数不是计算密集型,而是存在外部调用的 IO 密集型,则可以通过多线程解决问题。我们允许用户在特定函数的 @udf
标记中加入 io_threads
参数,这样就会启用内部线程池对同一个 Batch 的每一行并发执行,以降低总的 RPC 延迟。
尽管计算延迟可以通过并行或并发降低,但整个 RPC 的延迟还是不可避免的受制于网络通信。而网络延迟主要取决于 UDF Server 和内核的部署位置。在实践中,理想情况是将他们部署在同一个节点上,这样延迟最低,但可能产生资源的竞争。更常见的做法是将它们部署在同一数据中心的不同节点上,这样通信会经过内网,但管理起来相对方便。需要注意的是,一定避免将它们部署到不同的数据中心中,这样会引入非常大的网络延迟(可能高达数百毫秒)。
4总结
RisingWave 支持 Python 和 Java 语言的外部函数 UDF。它内部基于 Arrow Flight RPC 进行通信。外部函数的灵活性让用户可以在 UDF 中做几乎任何事情,比如访问本地存储或外部服务。但是,它也有通信延迟高的天然劣势。如果你需要的只是一些纯计算操作,那么上一篇介绍的 Rust UDF 可能是更好的选择。
Arrow Flight RPC: https://arrow.apache.org/docs/format/Flight.html
关于 RisingWave
往期推荐
技术内幕