在金融市场的瞬息万变中,量化投资凭借其数据驱动的决策优势,正逐步成为投资界的一股新势力。聚宽(JoinQuant)致力于成为基于国内金融市场大数据的量化研究先锋。我们运用量化研究、人工智能等前沿技术,持续挖掘市场规律、优化算法模型,并通过程序化交易实现策略的自动化执行。在聚宽典型的量化投资研究过程中,主要存在如下几个关键环节:
因子挖掘:利用先进的数据分析技术,在海量数据中挖掘对投资策略具有预测价值的关键变量。
收益预测:采用机器学习等先进技术,结合多个因子,构建对目标变量的精准预测模型,涵盖线性回归、决策树、神经网络等多种算法。
组合优化:在预期收益和风险约束的基础上,通过优化算法,实现投资组合的更好配置,以更大化投资回报。
回测检验:通过在历史数据上进行模拟交易,评估交易策略的有效性和稳定性。
上述的每个环节都是由海量数据驱动的数据密集型计算任务。依托阿里云提供的 ECS、ECI、ACK、NAS、OSS 等云产品,聚宽的技术团队在过去的几年内快速搭建起了一套较为完整的量化投研平台。
聚宽投研平台所应用的软件技术栈
我们以 Kubernetes 为底座,同时使用了阿里云 NAS、OSS、SLS、GPU 共享调度,HPA、Prometheus、Airflow、Prometheus 等云和云原生技术,得益于其计算成本和易于规模扩展的优势,以及容器化在高效部署和敏捷迭代方面的长处,囊括了越来越多的计算场景,例如海量金融历史数据驱动的因子计算、量化模型训练、投研策略回测等。
然而,在实际使用中,我们发现在云上对于数据密集型和弹性灵活性场景的真实量化投研产品支持还有诸多不足。
在量化投资研究的过程中,我们的投研平台遭遇了性能瓶颈、成本控制、数据集管理复杂性、数据安全问题以及使用体验等多重挑战。特别是在高并发访问和数据集管理方面,传统的 NAS 和 OSS 存储解决方案已无法满足我们对性能和成本效益的双重需求。数据科学家们在尝试实现更高效、更灵活的数据处理流程时,常常受限于现有技术的束缚。
数据管理难题:一路走来,我们的数据分散在不同的存储平台 NAS 和 OSS。研究员在进行因子挖掘时,需要结合分散在 NAS 和 OSS 上的数据。数据管理变得极其复杂,甚至需要手动将数据从一个平台迁移到另一个平台。
性能瓶颈:量化研究员在组合优化过程中,需要快速读取大量历史数据进行回测,此时通常会使用大量机器同时访问同一个数据集,这要求我们的存储系统能够提供极高的带宽,比如几百 Gbps 甚至 Tbps。但分布式文件存储系统的带宽受限,导致读取速度慢,影响了模型训练效率。
成本控制:量化研究员的投研实验时间非常不确定,有时候一天会有大量实验,有时候可能一整天都没有。这导致我们的带宽需求波动很大,如果总是预留大量带宽,大部分时间其实是在浪费资源,增加了不必要的成本。
数据安全顾虑:在研究员团队之间需要隔离不同的数据集,然而在 OSS 的同一个存储桶下 / 同一个 NAS 实例下数据无法被有效隔离。
技术使用门槛:量化研究员很多都是数据科学家出身,对 Kubernetes 不太熟悉,而使用 YAML 配置多个持久卷声明(PVC)来管理数据源对他们来说是一项挑战。
动态数据源挂载问题:量化研究员们在在使用 Jupyter Notebook 进行数据处理时,频繁需要挂载新的数据源。但每次挂载操作都会导致 Notebook 环境重启,严重影响效率。
我们发现仅靠 Kubernetes 的 CSI 体系无法满足我们对多数据源加速的需求,而 CNCF Sandbox 项目 Fluid 提供了一种简单的方式来统一管理和加速多个 Persistent Volume Claim 的数据,包括来自 OSS 和 NAS 的数据。
Fluid 支持多种 Runtime,包括 Jindo、Alluxio、JuiceFS、VIneyard 等分布式缓存系统;经过甄选比较,我们发现其中场景匹配、性能和稳定性比较突出的是 JindoRuntime。JindoRuntime 基于 JindoCache 的分布式缓存加速引擎。JindoCache (前身为 JindoFSx )是阿里云数据湖管理提供的云原生数据湖加速产品,适配 OSS、HDFS、标准 S3 协议、POSIX 等多种协议,在此基础上支持数据缓存、元数据缓存等功能。
进一步的调研和使用后,我们发现许多困扰研究员的问题都得到了有效解决,包括缓存成本、数据集安全管理和灵活使用的需求。目前整个系统基于 Fluid 已经平稳运行近一年,为量化研究团队带来了巨大帮助。以下我们分享一下我们的经验和收获:
问题:在进行不同类型的数据处理任务时,发现单一的数据存储配置无法满足需求。例如,训练任务的数据集需要设置为只读,而中间生成的特征数据和 checkpoint 则需要读写权限。传统的 Persistent Volume Claim (PVC) 无法灵活地同时处理来自不同存储的数据源。
方案:使用 Fluid,量化研究员可以为每一种数据类型设置不同的存储策略。例如,在同一个存储系统内,训练数据可以配置为只读,而特征数据和 checkpoint 可以配置为读写。这样,Fluid 帮助客户实现了不同数据类型在同一 PVC 中的灵活管理,提高了资源利用率和性能收益。
apiVersion: data.fluid.io/v1alpha1
kind: Dataset
metadata:
name: training-data
spec:
mounts:
- mountPoint: "pvc://nas/training-data"
path: "/training-data"
accessModes: ReadOnlyMany
---
apiVersion: data.fluid.io/v1alpha1
kind: Dataset
metadata:
name: checkpoint
spec:
mounts:
- mountPoint: "pvc://nas/checkpoint"
path: "/checkpoints"
accessModes: ReadWriteMany
问题:在运行量化分析时,计算资源需求波动很大。在高峰时段,需要短时间内调度大量计算实例,但在非高峰时段,资源需求几乎为零。固定预留资源不仅造成高成本,还导致资源浪费。
方案:Fluid 支持多种弹性策略,允许客户根据业务需求动态扩缩容。通过应用预热和按照业务规律进行扩缩容操作,Fluid 帮助客户在高峰时快速扩展计算资源,同时通过维护自我管理的数据缓存,实现数据缓存吞吐的动态弹性控制。这种弹性策略不仅提升了资源使用效率,还有效降低了运营成本。
问题:量化研究员使用 GPU 进行高密度数据计算,但每次任务调度时数据访问延时高,影响了整体计算性能。由于 GPU 资源昂贵,团队希望在调度 GPU 时数据能够尽可能接近计算节点。
方案:通过数据缓存感知的调度,Fluid 在应用调度时能够提供数据缓存位置信息给 Kubernetes 调度器。这让客户的应用能够调度到缓存节点或更接近缓存的节点,从而减少数据访问延时,最大化 GPU 资源的使用效率。
问题:量化研究员使用异构的数据源,存在一些方案,但无法满足跨存储数据集同时加速的需求,使用差异也给运维团队带来了适配复杂性。
方案:Fluid 提供了统一的 PVC 加速能力,OSS 数据和 NAS 数据皆可按创建 Dataset、扩容 Runtime、执行 Dataload 的流程进行操作。JindoRuntime 的 PVC 加速功能简单易用且性能都满足需求。
问题:需要确保数据的隔离和共享。为了保护敏感数据,需要在计算任务和数据上实现访问控制与隔离。同时,相对公开的数据需要方便研究员访问和使用。
方案:Fluid 的 Dataset 通过 Kubernetes 的 namespace 资源隔离机制,实现了不同团队之间的数据集访问控制。这样既保护了数据隐私,又满足了数据隔离的需求。同时,Fluid 支持跨 Namespace 的数据访问,使得公开数据集可以在多个团队中重复使用,实现一次缓存,多个团队共享,大幅提升了数据利用率和管理的便捷性。
问题:量化研究员主要用 Python 进行开发,但当需要在 Kubernetes 环境下运行时,他们必须学习并使用 YAML。许多研究员表示 YAML 学不会,这给他们带来了很大的学习成本和开发效率的降低。
方案:Fluid 提供了 DataFlow 数据流功能,允许用户利用 Fluid 的 API 定义自动化数据处理流程,包括缓存扩容 / 缩容、预热、迁移和自定义的数据处理相关操作。最值得一提的是,这些操作都可以通过 Python 接口完成,实现在本地开发环境和生产环境中使用同一套代码进行精准预测模型的开发和训练。
问题:研究员在 Jupyter 容器中工作时,常需要动态挂载新的存储数据源。传统方式需要重启 Pod,这不仅浪费时间,还打断了他们的工作流程,是他们诟病已久的问题。
方案:Fluid 的 Dataset 功能支持描述多个数据源,并允许用户动态挂载或卸载新旧挂载点,且这些改变即时对用户容器可见,无需重启。这解决了数据科学家对容器使用的最大抱怨。
尽管开源的 Fluid 有诸多优点,但在实践中我们发现它并不能完全满足我们的需求:
对多类型弹性资源的不完全支持。以阿里云为例,我们使用的弹性资源包含 ECS 和 ECI, 在工作负载调度时,系统会优先调度至 ECS, 当 ECS 资源耗尽后,才转向 ECI。因此,我们需要 Fluid 同时支持这两种资源。但据我们了解,开源 Fluid 的 FUSE Sidecar 需要依赖 privileged 权限,而在 ECI 上无法实现。
开源 Fluid 在监控和可观测性的方案有限,而且配置比较复杂。对于生产系统来说,完整的监控日志方案还是比较重要的,但是自己开发比较麻烦。
开源 Fluid 并不支持动态挂载,这对于数据科学家来说是比较刚性的需求。
我们开始寻找解决方案并发现了阿里云 ACK 云原生 AI 套件中的 ack-fluid,它可以很好地解决这些问题:
ack-fluid 基于开源 Fluid 标准对于 JindoRuntime 提供了完整的支持,我们在线下开源 Fluid 上完成调试,在 ACK 上就可以获得完整能力。
ack-fluid 无缝支持阿里云的 ECI,且不需要开启 privileged 权限,完全满足了我们在云上弹性容器实例 ECI 访问不同数据源的需求。
ack-fluid 集成了功能齐全的监控大盘,且易于获取,我们只需在阿里云 Prometheus 监控中进行一键安装即可。
同时 ack-fluid 支持在 ECS 和 ECI 上实现动态挂载多数据源的能力,这也是我们极为看重的功能。
Fluid 的 JindoRuntime 优先选择具有高网络 IO 和大内存能力的 ECS 和 ECI 作为缓存 worker。云服务器 ECS 的网络能力不断提升,当前的网络带宽已经远超 SSD 云盘的 IO 能力。以阿里云上的 ECS 规格 ecs.g8i.16xlarge 为例,它的基础网络带宽值达到 32Gbps,内存为 256GiB。假设提供两台这样的 ECS,那么理论上,仅用 2 秒就能完成 32GB 数据的读取。
在离线计算这类容错率较高的场景上,可以考虑使用 Spot 实例作为缓存 Worker,并且增加 K8s 注解 "k8s.aliyun.com/eci-spot-strategy": "SpotAsPriceGo"。这样既能享受 Spot 实例带来的成本优惠,而又能保证较高的稳定性。
由于业务特点,投研平台的吞吐用量有着非常明显的潮汐特征,因此简单的配置定时缓存节点的弹性伸缩策略能到达到不错的收益,包括成本的控制和对性能提升。针对研究员单独需求的数据集,也可以预留接口供他们手动伸缩。
apiVersion:autoscaling.alibabacloud.com/v1beta1
kind: CronHorizontalPodAutoscaler
metadata:
name: joinquant-research
namespace: default
spec:
scaleTargetRef:
apiVersion: data.fluid.io/v1alpha1
kind: JindoRuntime
name: joinquant-research
jobs:
- name: "scale-down"
schedule: "0 0 7 ? * 1"
targetSize: 10
- name: "scale-up"
schedule: "0 0 18 ? * 5-6"
targetSize: 20
JindoRuntime 支持在读数据的同时进行预热,但由于这种同时执行的方式会带来较大的性能开销,我们的经验是先进行预热,同时监控缓存比例。一旦缓存比例达到一定阈值,就开始触发任务下发。这避免了提前运行高并发任务导致的 IO 延迟问题。
定期更新缓存中的元数据以适应后端存储数据的周期性变化
为了提升数据加载速度,我们选择将 JindoRuntime 的 Master 中的元数据长期保留并多次拉取。同时,由于业务数据经常被采集并存储到后端存储系统中,这一过程并未经过缓存,导致 Fluid 无法感知。为解决这个问题,我们通过配置 Fluid 定时执行数据预热,从而同步底层存储系统的数据变化。
我们在 Dataset 和 DataLoad 的配置文件中进行了相关设置:
apiVersion: data.fluid.io/v1alpha1
kind: Dataset
metadata:
name: joinquant-dataset
spec:
...
# accessModes: ["ReadOnlyMany"] ReadOnlyMany为默认值
---
apiVersion: data.fluid.io/v1alpha1
kind: DataLoad
metadata:
name: joinquant-dataset-warmup
spec:
...
policy: Cron
schedule: "0 0 * * *" # 每日0点执行数据预热
loadMetadata: true # 数据预热时同步底层存储系统数据变化
target:
- path: /path/to/warmup
在实际评估中,我们使用了 20 个规格为 ecs.g8i.16xlarge 的 ECI 作为 worker 节点来构建 JindoRuntime 集群,单个 ECI 的带宽上限为 32Gbps; 同时任务节点选择 ecs.gn7i-c16g1.4xlarge, 带宽上限为 16Gbps;拉取文件大小 30GiB;高性能分布式存储峰值带宽为 3GB/s, 且该带宽仅随着存储使用容量增加而提高。同时为了提升数据读取速度,我们选择使用内存进行数据缓存。
为便于对比,我们统计了访问耗时数据,并与利用 Fluid 技术访问数据的耗时进行了对比。结果如下所示:
当并发 Pod 数量较少时,传统高性能分布式存储的带宽能够满足需求,因此 Fluid 并未展现出明显优势。然而,随着并发 Pod 数量的增加,Fluid 的性能优势愈发显著。当并发扩展到 10 个 Pod 时,使用 Fluid 可以将平均耗时缩短至传统方式的 1/5;而当扩展到 100 个 Pod 时,数据访问时间从 15 分钟缩短至 38.5 秒,计算成本也降低为十分之一。这大幅提升了任务处理速度,并显著降低了由于 IO 延迟带来的 ECI 成本。
更为重要的是,Fluid 系统的数据读取带宽与 JindoRuntime 集群规模正相关,如果需要扩容更多的 Pod,我们可以通过修改 JindoRuntime 的 Replicas 来增加数据带宽。这种动态扩容的能力是传统的分布式存储无法满足的。
通过使用 Fluid,我们在只读数据集场景中获得了更多的灵活性和可能性。Fluid 使我们认识到,除了计算资源,数据缓存也可以视为一种无状态的弹性资源,可以与 Kubernetes 的弹性伸缩策略相结合,以适应工作负载的变化,满足我们在使用中对于高峰和低谷的需求。
总而言之,Fluid 无疑带来了数据缓存的弹性特性,提高了工作效率,为我们的工作带来了实质性的好处。
我们未来会和 Fluid 社区持续合作,一方面更好解决我们每天遇到的问题,同时也推动社区的改进:
任务弹性和数据缓存弹性的协同:我们的业务系统能理解并预测一段时间内使用相同数据集的任务并发量。在任务排队的过程中,我们可以执行数据预热和弹性扩缩容。当数据缓存或者数据访问吞吐满足一定的条件时,此时的系统可以将排队任务从等待状态变为可用状态。这种策略可以让我们更有效地利用资源,提升任务的处理速度,减少等待时间。
优化 Dataflow 的数据亲合性能力:Fluid 提供了 Dataflow 功能,可以帮助我们将数据运维操作和业务中的数据处理串联起来。然而,由于不同的操作会在不同的节点进行,这可能会降低整体的性能。我们期望后续的操作能在前序操作相同的节点 / 可用区 / 地区进行,从而提高整体的执行效率和数据处理速度。这种优化可以减少数据传输的时间,提升整体的性能。
安装 ACK Fluid 的管理组件,参看文档:https://help.aliyun.com/zh/ack/cloud-native-ai-suite/user-guide/overview-of-fluid
创建基于 NAS 的 PVC,可以参考文档:https://help.aliyun.com/zh/eci/user-guide/mount-a-nas-file-system, 获得的输出
$ kubectl get pvc,pv
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
persistentvolumeclaim/static-pvc-nas Bound demo-pv 5Gi RWX 19h
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
persistentvolume/static-pv-nas 30Gi RWX Retain Bound default/static-pvc-nas 19h
通过 python SDK 创建数据集,并且按照顺序完成扩容和数据预热(也可以通过 YAML 的方式创建)
import fluid
from fluid import constants
from fluid import models
# Connect to the Fluid using the default kubeconfig file and create a Fluid client instance
client_config = fluid.ClientConfig()
fluid_client = fluid.FluidClient(client_config)
# Create a Dataset named mydata under default namespace,
# it's ReadOnly by default.
fluid_client.create_dataset(
dataset_name="mydata",
mount_name="/",
mount_point="pvc://static-pvc-nas/mydata"
)
# Initialize the configuration of the JindoRuntime runtime
# and bind the Dataset to that runtime.
# The number of replicas is 1 and the memory is 30Gi respectively
# Then scale out the elastic cache and preload the data in specified path
dataflow = dataset.bind_runtime(
runtime_type=constants.JINDO_RUNTIME_KIND,
replicas=1,
cache_capacity_GiB=30,
cache_medium="MEM",
wait=True
).scale_cache(replicas=2).preload(target_path="/train")
# Submit the data flow and wait for its completion
run = dataflow.run()
run.wait()
此时可以查看数据集的状态,可以看到数据缓存完成,就可以很简单地开始使用缓存过的数据:
$ kubectl get dataset mydata
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE AGE
mydata 52.95GiB 52.95GiB 60.00GiB 100.0% Bound 2m19s
向东伟,聚宽高级量化工程师。毕业于北京大学信息科学技术学院,专注于量化投研 MLSys 领域。
陶宇宸,聚宽高级量化工程师。毕业于长春理工大学,专注于大数据领域,曾获 ACM / ICPC 区域赛银奖、邀请赛金奖。
聚宽以平台赋能策略创新,走出一条“技术驱动的量化基金”之路。聚宽多次荣获中国私募金牛奖、英华奖、金长江奖等行业殊荣,并已取得投顾资格。聚宽凭借扎实的策略研发与高效的投研团队稳步发展,致力于为投资者持续创造价值。
转载自https://mp.weixin.qq.com/s/U9blYoOc40EPKK0GlCJWzA