RisingWave 用户定义函数 (三):Python 外部函数

文摘   科技   2024-08-01 15:36   北京  

作者: 王润基|RisingWave Labs 内核开发工程师

RisingWave 允许用户使用 Python 和 Java 语言编写自定义函数(UDF)。这些函数运行在独立的进程中,以 RPC 的方式向 RisingWave 提供服务。我们称这种形式的 UDF 为 “外部函数”(External Function)。

外部函数的特点是它具有天然的隔离性和灵活性:函数本身的运行不会影响到 RisingWave 本体。用户可以在进程中做任何他们想做的事情,也可以将这个进程灵活部署在任何地方。但是,这种方式也有它的问题,比如 RPC 延迟高、异常处理复杂,以及额外的运维负担等。

在这篇文章中,我们就来讨论 RisingWave 外部函数的实现。

1用户接口

首先还是来看用户接口。要在 RisingWave 中创建一个 Python 外部函数,用户需要首先安装 arrow-udf 包,然后在一个文件中定义并注册函数,最后运行这个文件作为 UDF Server。

# Import components from the arrow-udf module
from arrow_udf import udf, udtf, UdfServer

# Define a scalar function that returns a single value
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
    while y != 0:
        (x, y) = (y, x % y)
    return x

# Define a table function over a Python generator function
@udtf(input_types='INT', result_types='INT')
def series(n):
    for i in range(n):
        yield i

# Start a UDF server
if __name__ == '__main__':
    server = UdfServer(location="0.0.0.0:8815")
    server.add_function(gcd)
    server.add_function(series)
    server.serve()

服务启动后,在 RisingWave 中创建函数,与 Python 服务器建立连接。

CREATE FUNCTION gcd(intintRETURNS int
USING LINK 'http://localhost:8815' AS gcd;

CREATE FUNCTION series(intRETURNS TABLE (x int)
USING LINK 'http://localhost:8815' AS series;

之后就可以像任何内置函数一样调用 Python 函数了。

2内部实现

在内部实现中,我们采用 Arrow Flight RPC[1] 框架作为内核和服务端的通信框架,并基于 Flight RPC 接口设计了一套简单的 UDF 调用协议。选择 Arrow Flight 的理由是,它是一套与语言无关的高效数据交换协议,并且为常见语言都提供了 RPC 库的实现。这使得我们能够快速搭建原型系统,而不必关心具体的 RPC 实现细节,未来还能方便地扩展到更多语言上去。

Arrow Flight RPC 示意图

Arrow Flight 提供了多种数据交换接口。我们主要使用其中两个:

  1. GetFlightInfo:获取给定资源的元数据信息,例如 Schema。我们用它实现对给定函数的查询功能,用于在创建函数时检查签名是否匹配。
  2. DoExchange一个双向流式调用,可以同时发送和接收 Arrow 数据。我们用它实现对函数的调用操作。
    1. 对于标量函数,客户端发送一个包含函数参数的 Batch,收到一个相同行数的返回值 Batch
    2. 对于表值函数,则会收到若干不定长的 Batch,表示函数返回的 Table
    3. 对于聚合函数,则是连续发送要聚合的参数,最后收到一个聚合结果(我们暂时没有实现它)

为了保持对 UDF Server 的向后兼容性,每当这个调用协议发生变化,都需要更新协议的版本号。Server 需要暴露自己当前的协议版本,以便客户端能正确调用。因此我们后来还使用了 DoAction 传递版本信息。

虽然内部使用了 Arrow 进行存储和通信,但是用户最终编写函数时,使用的都是语言的原生类型。因此我们提供的 SDK 的主要职责就是隐藏 Arrow 数据格式和批处理的具体细节,将传过来的数据层层转换,最终调用到用户提供的原生函数上去。

此时,不同语言的特性会使得它们的接口和内部实现略有不同。

对于 Python 这类动态类型语言,我们需要用户标注好每个函数输入输出的 SQL 类型:

@udf(input_types=['VARCHAR[]', 'INT'], result_type='VARCHAR')
def array_index(array, i):
    # ...

而对于 Java 这类静态类型语言,我们则可以通过反射从 Java 类型自动推断出 SQL 类型

public class ArrayIndex implements ScalarFunction {
    public String eval(String[] array, int i) {
      // => array_index(varchar[], int) -> varchar
        // ...
    }
}

Python 语言由于其动态类型的特性,使用 pyarrow 时可以直接从原生对象的 List 生成 Arrow array;但是 Java 语言不支持动态类型,因此使用 Arrow Java 包时,还需要手动枚举各种类型并实现相应的构建 Array 操作;更进一步,如果语言的动态灵活性更低,像 Rust 一样都没有运行时反射,那么就只能通过过程宏这种编译期手段解决问题了。

3问题探讨

最后我们来讨论一下外部函数在实际应用中遇到的问题。

首先是性能问题。众所周知,Python 语言代码运行起来十分缓慢。一旦用户逻辑稍微复杂一点,每个函数运行时间就会很长。再加上 Arrow 的批处理特性,服务端需要为每一行依次调用函数,最后一起返回。这就使得一次 RPC 的延迟可能非常高。而 UDF 可能只是庞大流处理管线中的一环,但是这一环就会导致整个流处理的阻塞。为了避免阻塞,一个简单的方法是设置超时机制,即一旦函数没有按时返回就放弃等待,结果直接填 NULL。这种方法在批处理中问题不大,但在流处理中还可能引发后续问题,即两次的计算结果不一致(一次超时一次正常),进而引发后续结果的错乱。因此可行的办法还是努力降低延迟。

为了降低高负载下的延迟,我们需要扩展处理能力。不幸的是,Python 语言的 GIL 再次成为阻碍,使得我们无法通过多线程的方式充分利用多核算力。因而,我们只能手动启动多个服务进程,然后通过代理进行负载均衡。但是,由于 Arrow Flight 依赖 gRPC 依赖 HTTP2 的多路复用特性,如果没有合理配置的话,还可能出现负载不均衡的情况。另一方面,如果用户函数不是计算密集型,而是存在外部调用的 IO 密集型,则可以通过多线程解决问题。我们允许用户在特定函数的 @udf 标记中加入 io_threads 参数,这样就会启用内部线程池对同一个 Batch 的每一行并发执行,以降低总的 RPC 延迟。

尽管计算延迟可以通过并行或并发降低,但整个 RPC 的延迟还是不可避免的受制于网络通信。而网络延迟主要取决于 UDF Server 和内核的部署位置。在实践中,理想情况是将他们部署在同一个节点上,这样延迟最低,但可能产生资源的竞争。更常见的做法是将它们部署在同一数据中心的不同节点上,这样通信会经过内网,但管理起来相对方便。需要注意的是,一定避免将它们部署到不同的数据中心中,这样会引入非常大的网络延迟(可能高达数百毫秒)。

4总结

RisingWave 支持 Python 和 Java 语言的外部函数 UDF。它内部基于 Arrow Flight RPC 进行通信。外部函数的灵活性让用户可以在 UDF 中做几乎任何事情,比如访问本地存储或外部服务。但是,它也有通信延迟高的天然劣势。如果你需要的只是一些纯计算操作,那么上一篇介绍的 Rust UDF 可能是更好的选择。

参考资料
[1]

Arrow Flight RPC: https://arrow.apache.org/docs/format/Flight.html

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

往期推荐

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析
RisingWave 助力乾象投资打造实时监控平台

RisingWave中文开源社区
RisingWave 是一款开源分布式 SQL 流数据库,致力于大幅降低流计算使用门槛与复杂度。RisingWave 已为全球超百家企业构建新一代流处理与分析平台。
 最新文章