组成
使用 Dask Futures 并行化任何 Python 代码,让您扩展任何函数和 for 循环,并在任何情况下为您提供解决能力。
Dask由两部分组成:
针对计算优化的动态任务调度。这类似于 Airflow、Luigi、Celery 或 Make,但针对交互式计算工作负载进行了优化。
“大数据”集合,如并行数组、数据帧和列表,将 NumPy、Pandas 或 Python 迭代器等常见接口扩展到大于内存或分布式环境。这些并行集合在动态任务调度程序之上运行。
Dask 有以下优点:
通用:提供并行化的 NumPy 数组和 Pandas DataFrame 对象 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度界面。 原生:在纯 Python 中启用分布式计算,并访问 PyData 堆栈。 快速:以低开销、低延迟和快速数值算法所需的最小序列化运行 扩容:在具有 1000 个内核的集群上弹性运行 缩容:在单个进程中在笔记本电脑上设置和运行微不足道 响应式:设计时考虑到交互式计算,它提供快速反馈和诊断人性化
以下是高级集合用于生成可由调度程序在单台机器或集群上执行的任务图:
Python 已经发展成为数据分析和通用编程领域的主导语言。NumPy、pandas 和 scikit-learn 等计算库推动了这种增长。然而,这些包的设计并不是为了扩展到单台机器之外。Dask 被开发用于在数据集超过内存时将这些包和周围的生态系统本地扩展为多核机器和分布式集群。
快速使用
安装
安装 Dask:
pip install dask # 仅安装 dask 核心功能
pip install "dask[complete]" # 安装所有功能
pip install "dask[array]" # array 支持
pip install "dask[dataframe]" # dataframe 支持
pip install "dask[diagnostics]" # 可视化dask诊断
pip install "dask[distributed]" # 分布式支持
conda install dask # conda 安装
常用导入模块及约定别名:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
根据您使用的数据类型,导入适当的模块。
构造数据结构
利用现有数据构造 Dask 对象。
# 构造 DataFrame
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400),
"b": list("abcaddbe" * 300)},
index=index)
ddf = dd.from_pandas(df, npartitions=10)
ddf
'''
Dask DataFrame Structure:
a b
npartitions=10
2021-09-01 00:00:00 int64 object
2021-09-11 00:00:00 ... ...
... ...
2021-11-30 00:00:00 ... ...
2021-12-09 23:00:00 ... ...
Dask Name: from_pandas, 10 tasks
'''
现在我们有一个包含 2 列和 2400 行的 DataFrame,由 10 个分区组成,每个分区有 240 行。每个分区代表一段数据。
一些关键的属性:
# 每个分区覆盖的索引值
ddf.divisions
# 访问特定分区
ddf.partitions[1]
Array 结构:
data = np.arange(100_000).reshape(200, 500)
a = da.from_array(data, chunks=(100, 100))
a
'''
dask.array<array, shape=(200, 500),
dtype=int64, chunksize=(100, 100),
chunktype=numpy.ndarray>
'''
现在我们有一个形状为 (200, 500) 的二维数组,由 10 个块组成,其中每个块的形状为 (100, 100)。每个块代表一段数据。
以下是数组的一些关键属性:
# 检查块
a.chunks
# ((100, 100), (100, 100, 100, 100, 100))
# 访问特定的数据块
a.blocks[1, 3]
索引
索引 Dask 集合就像切片 numpy 数组或 pandas 的 DataFrame。
ddf.b'''
Dask Series Structure:
npartitions=10
2021-09-01 00:00:00 object
2021-09-11 00:00:00 ...
...
2021-11-30 00:00:00 ...
2021-12-09 23:00:00 ...
Name: b, dtype: object
Dask Name: getitem, 20 tasks
'''
ddf["2021-10-01": "2021-10-09 5:00"]
'''
Dask DataFrame Structure:
a b
npartitions=1
2021-10-01 00:00:00.000000000 int64 object
2021-10-09 05:00:59.999999999 ... ...
Dask Name: loc, 11 tasks
'''
Array 结构的索引:
a[:50, 200]
'''
dask.array<getitem, shape=(50,),
dtype=int64, chunksize=(50,),
chunktype=numpy.ndarray>
'''
计算
Dask 采用懒惰评估机制,直到你要求计算的结果才会被计算出来。相应地,会生成一个用于计算的 Dask 任务图。
任何时候你有一个 Dask 对象并且你想得到结果,调用 compute 计算:
ddf.a.mean()# dd.Scalar<series-..., dtype=float64>
ddf.a.mean().compute()
# 1199.5
ddf.b.unique()
'''
Dask Series Structure:
npartitions=1
object
...
Name: b, dtype: object
Dask Name: unique-agg, 33 tasks
'''
ddf.b.unique().compute()
'''
0 a
1 b
2 c
3 d
4 e
Name: b, dtype: object
'''
方法可以像 pandas 一样链接在一起:
result = ddf["2021-10-01": "2021-10-09 5:00"].a.cumsum() - 100
result
'''
Dask Series Structure:
npartitions=1
2021-10-01 00:00:00.000000000 int64
2021-10-09 05:00:59.999999999 ...
Name: a, dtype: int64
Dask Name: sub, 16 tasks
'''
result.compute()
'''
2021-10-01 00:00:00 620
2021-10-01 01:00:00 1341
2021-10-01 02:00:00 2063
2021-10-01 03:00:00 2786
2021-10-01 04:00:00 3510
...
2021-10-09 01:00:00 158301
2021-10-09 02:00:00 159215
2021-10-09 03:00:00 160130
2021-10-09 04:00:00 161046
2021-10-09 05:00:00 161963
Freq: H, Name: a, Length: 198, dtype: int64
'''
Array 的计算:
a.mean()
'''
dask.array<mean_agg-aggregate, shape=(),
dtype=float64, chunksize=(), chunktype=numpy.ndarray>
'''
a.mean().compute()
# 49999.5
np.sin(a)
'''
dask.array<sin, shape=(200, 500),
dtype=float64, chunksize=(100, 100),
chunktype=numpy.ndarray>
'''
np.sin(a).compute()
'''
array([[ 0. , 0.84147098, 0.90929743, ..., 0.58781939,
0.99834363, 0.49099533],
[-0.46777181, -0.9964717 , -0.60902011, ..., -0.89796748,
-0.85547315, -0.02646075],
[ 0.82687954, 0.9199906 , 0.16726654, ..., 0.99951642,
0.51387502, -0.4442207 ],
...,
[-0.99720859, -0.47596473, 0.48287891, ..., -0.76284376,
0.13191447, 0.90539115],
[ 0.84645538, 0.00929244, -0.83641393, ..., 0.37178568,
-0.5802765 , -0.99883514],
[-0.49906936, 0.45953849, 0.99564877, ..., 0.10563876,
0.89383946, 0.86024828]])
'''
a.T
'''
dask.array<transpose, shape=(500, 200),
dtype=int64, chunksize=(100, 100),
chunktype=numpy.ndarray>
'''
a.T.compute()
'''
array([[ 0, 500, 1000, ..., 98500, 99000, 99500],
[ 1, 501, 1001, ..., 98501, 99001, 99501],
[ 2, 502, 1002, ..., 98502, 99002, 99502],
...,
[ 497, 997, 1497, ..., 98997, 99497, 99997],
[ 498, 998, 1498, ..., 98998, 99498, 99998],
[ 499, 999, 1499, ..., 98999, 99499, 99999]])
'''
方法可以像在NumPy中一样链接在一起
b = a.max(axis=1)[::-1] + 10
b
'''
dask.array<add, shape=(200,),
dtype=int64, chunksize=(100,),
chunktype=numpy.ndarray>
'''
b[:10].compute()
'''
array([100009, 99509, 99009, 98509, 98009,
97509, 97009, 96509,
96009, 95509])
'''
惰性计算
通常在并行化现有代码库或构建自定义算法时,您会遇到可并行化的代码,但不仅仅是大型 DataFrame 或数组。
Dask Delayed 允许您将单个函数调用包装到一个延迟构造的任务图中:
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add(x, y):
return x + y
a = inc(1) # 没有开始计算
b = inc(2) # 没有开始计算
c = add(a, b) # 没有开始计算
c = c.compute() # 这会触发上述所有计算
Futures
与之前描述的接口不同,Futures 是即将执行的,提交函数后立即开始计算。
from dask.distributed import Client
client = Client()
def inc(x):
return x + 1
def add(x, y):
return x + y
a = client.submit(inc, 1) # 任务立即开始
b = client.submit(inc, 2) # 任务立即开始
c = client.submit(add, a, b) # 任务立即开始
c = c.result() # 阻塞直到任务完成,然后收集结果
调度
生成任务图后,调度程序的工作就是执行它。
默认情况下,当您在 Dask 对象上调用 compute 时,Dask 使用计算机上的线程池并行运行计算。
如果您想要更多控制,请改用分布式调度程序。尽管名称中有“分布式”,但分布式调度程序在单台和多台机器上都能很好地工作。将其视为“高级调度程序”。
用自己的计算机的集群的方式:
from dask.distributed import Client
client = Client()
client
# <Client: 'tcp://127.0.0.1:41703'
# processes=4 threads=12, memory=31.08 GiB>
连接到已在运行的集群的方式。
from dask.distributed import Client
client = Client("<url-of-scheduler>")
client
# <Client: 'tcp://127.0.0.1:41703' processes=4
# threads=12, memory=31.08 GiB>
以上各方法更加详细的内容可以参考官方文档。
参考
https://docs.dask.org/en/stable