Dask,一个功能强大的大数据并行计算 Python 库

文摘   2024-11-13 19:58   山东  
大家好,今天 668 给大家分享作为 Python 最牛逼的大数据并行计算工具之一的 – Dask。

Dask 是 Python 中用于大量数据并行计算的库,支持各种灵活的方法。Dask 可以轻松扩展您熟悉和喜爱的 Python 库,例如 NumPy、pandas 和 scikit-learn。

组成

使用 Dask Futures 并行化任何 Python 代码,让您扩展任何函数和 for 循环,并在任何情况下为您提供解决能力。

Dask由两部分组成:

  • 针对计算优化的动态任务调度。这类似于 Airflow、Luigi、Celery 或 Make,但针对交互式计算工作负载进行了优化。

  • “大数据”集合,如并行数组、数据帧和列表,将 NumPy、Pandas 或 Python 迭代器等常见接口扩展到大于内存或分布式环境。这些并行集合在动态任务调度程序之上运行。

Dask 有以下优点:

  • 通用:提供并行化的 NumPy 数组和 Pandas DataFrame 对象
  • 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度界面。
  • 原生:在纯 Python 中启用分布式计算,并访问 PyData 堆栈。
  • 快速:以低开销、低延迟和快速数值算法所需的最小序列化运行
  • 扩容:在具有 1000 个内核的集群上弹性运行
  • 缩容:在单个进程中在笔记本电脑上设置和运行微不足道
  • 响应式:设计时考虑到交互式计算,它提供快速反馈和诊断人性化

以下是高级集合用于生成可由调度程序在单台机器或集群上执行的任务图:

Python 已经发展成为数据分析和通用编程领域的主导语言。NumPy、pandas 和 scikit-learn 等计算库推动了这种增长。然而,这些包的设计并不是为了扩展到单台机器之外。Dask 被开发用于在数据集超过内存时将这些包和周围的生态系统本地扩展为多核机器和分布式集群。

快速使用

安装

安装 Dask:

pip install dask                # 仅安装 dask 核心功能pip install "dask[complete]"    # 安装所有功能
pip install "dask[array]" # array 支持pip install "dask[dataframe]" # dataframe 支持pip install "dask[diagnostics]" # 可视化dask诊断pip install "dask[distributed]" # 分布式支持
conda install dask # conda 安装

常用导入模块及约定别名:

import numpy as npimport pandas as pd
import dask.dataframe as ddimport dask.array as daimport dask.bag as db

根据您使用的数据类型,导入适当的模块。

构造数据结构

利用现有数据构造 Dask 对象。

# 构造 DataFrameindex = pd.date_range("2021-09-01", periods=2400, freq="1H")df = pd.DataFrame({"a": np.arange(2400),                   "b": list("abcaddbe" * 300)},                  index=index)ddf = dd.from_pandas(df, npartitions=10)ddf'''Dask DataFrame Structure:                        a       bnpartitions=102021-09-01 00:00:00  int64  object2021-09-11 00:00:00    ...     ...                   ...     ...2021-11-30 00:00:00    ...     ...2021-12-09 23:00:00    ...     ...Dask Name: from_pandas, 10 tasks'''

现在我们有一个包含 2 列和 2400 行的 DataFrame,由 10 个分区组成,每个分区有 240 行。每个分区代表一段数据。

一些关键的属性:

# 每个分区覆盖的索引值ddf.divisions# 访问特定分区ddf.partitions[1]

Array 结构:

data = np.arange(100_000).reshape(200, 500)a = da.from_array(data, chunks=(100, 100))a'''dask.array<array, shape=(200, 500), dtype=int64, chunksize=(100, 100), chunktype=numpy.ndarray>'''

现在我们有一个形状为 (200, 500) 的二维数组,由 10 个块组成,其中每个块的形状为 (100, 100)。每个块代表一段数据。

以下是数组的一些关键属性:

# 检查块a.chunks# ((100, 100), (100, 100, 100, 100, 100))
# 访问特定的数据块a.blocks[1, 3]

索引

索引 Dask 集合就像切片 numpy 数组或 pandas 的 DataFrame。

ddf.b'''Dask Series Structure:npartitions=102021-09-01 00:00:00    object2021-09-11 00:00:00       ...                        ...2021-11-30 00:00:00       ...2021-12-09 23:00:00       ...Name: b, dtype: objectDask Name: getitem, 20 tasks'''
ddf["2021-10-01": "2021-10-09 5:00"]'''Dask DataFrame Structure: a bnpartitions=12021-10-01 00:00:00.000000000 int64 object2021-10-09 05:00:59.999999999 ... ...Dask Name: loc, 11 tasks'''

Array 结构的索引:

a[:50, 200]'''dask.array<getitem, shape=(50,), dtype=int64, chunksize=(50,),  chunktype=numpy.ndarray>'''

计算

Dask 采用懒惰评估机制,直到你要求计算的结果才会被计算出来。相应地,会生成一个用于计算的 Dask 任务图。

任何时候你有一个 Dask 对象并且你想得到结果,调用 compute 计算:

ddf.a.mean()# dd.Scalar<series-..., dtype=float64>
ddf.a.mean().compute()# 1199.5
ddf.b.unique()'''Dask Series Structure:npartitions=1 object ...Name: b, dtype: objectDask Name: unique-agg, 33 tasks'''
ddf.b.unique().compute()'''0 a1 b2 c3 d4 eName: b, dtype: object'''

方法可以像 pandas 一样链接在一起:

result = ddf["2021-10-01": "2021-10-09 5:00"].a.cumsum() - 100result'''Dask Series Structure:npartitions=12021-10-01 00:00:00.000000000    int642021-10-09 05:00:59.999999999      ...Name: a, dtype: int64Dask Name: sub, 16 tasks'''
result.compute()'''2021-10-01 00:00:00 6202021-10-01 01:00:00 13412021-10-01 02:00:00 20632021-10-01 03:00:00 27862021-10-01 04:00:00 3510 ...2021-10-09 01:00:00 1583012021-10-09 02:00:00 1592152021-10-09 03:00:00 1601302021-10-09 04:00:00 1610462021-10-09 05:00:00 161963Freq: H, Name: a, Length: 198, dtype: int64'''

Array 的计算:

a.mean()'''dask.array<mean_agg-aggregate, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>'''
a.mean().compute()# 49999.5
np.sin(a)'''dask.array<sin, shape=(200, 500), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>'''
np.sin(a).compute()'''array([[ 0. , 0.84147098, 0.90929743, ..., 0.58781939, 0.99834363, 0.49099533], [-0.46777181, -0.9964717 , -0.60902011, ..., -0.89796748, -0.85547315, -0.02646075], [ 0.82687954, 0.9199906 , 0.16726654, ..., 0.99951642, 0.51387502, -0.4442207 ], ..., [-0.99720859, -0.47596473, 0.48287891, ..., -0.76284376, 0.13191447, 0.90539115], [ 0.84645538, 0.00929244, -0.83641393, ..., 0.37178568, -0.5802765 , -0.99883514], [-0.49906936, 0.45953849, 0.99564877, ..., 0.10563876, 0.89383946, 0.86024828]])
'''
a.T'''dask.array<transpose, shape=(500, 200), dtype=int64, chunksize=(100, 100), chunktype=numpy.ndarray>'''
a.T.compute()'''array([[ 0, 500, 1000, ..., 98500, 99000, 99500], [ 1, 501, 1001, ..., 98501, 99001, 99501], [ 2, 502, 1002, ..., 98502, 99002, 99502], ..., [ 497, 997, 1497, ..., 98997, 99497, 99997], [ 498, 998, 1498, ..., 98998, 99498, 99998], [ 499, 999, 1499, ..., 98999, 99499, 99999]])'''

方法可以像在NumPy中一样链接在一起

b = a.max(axis=1)[::-1] + 10b'''dask.array<add, shape=(200,), dtype=int64, chunksize=(100,),  chunktype=numpy.ndarray>'''
b[:10].compute()'''array([100009, 99509, 99009, 98509, 98009, 97509, 97009, 96509, 96009, 95509])'''

惰性计算

通常在并行化现有代码库或构建自定义算法时,您会遇到可并行化的代码,但不仅仅是大型 DataFrame 或数组。

Dask Delayed 允许您将单个函数调用包装到一个延迟构造的任务图中:

import dask
@dask.delayeddef inc(x): return x + 1
@dask.delayeddef add(x, y): return x + y
a = inc(1) # 没有开始计算b = inc(2) # 没有开始计算c = add(a, b) # 没有开始计算
c = c.compute() # 这会触发上述所有计算

Futures

与之前描述的接口不同,Futures 是即将执行的,提交函数后立即开始计算。

from dask.distributed import Client
client = Client()
def inc(x): return x + 1
def add(x, y): return x + y
a = client.submit(inc, 1) # 任务立即开始b = client.submit(inc, 2) # 任务立即开始c = client.submit(add, a, b) # 任务立即开始
c = c.result() # 阻塞直到任务完成,然后收集结果

调度

生成任务图后,调度程序的工作就是执行它。

默认情况下,当您在 Dask 对象上调用 compute 时,Dask 使用计算机上的线程池并行运行计算。

如果您想要更多控制,请改用分布式调度程序。尽管名称中有“分布式”,但分布式调度程序在单台和多台机器上都能很好地工作。将其视为“高级调度程序”。

用自己的计算机的集群的方式:

from dask.distributed import Client
client = Client()client# <Client: 'tcp://127.0.0.1:41703' # processes=4 threads=12, memory=31.08 GiB>

连接到已在运行的集群的方式。

from dask.distributed import Client
client = Client("<url-of-scheduler>")client# <Client: 'tcp://127.0.0.1:41703' processes=4# threads=12, memory=31.08 GiB>

以上各方法更加详细的内容可以参考官方文档。

参考

  • https://docs.dask.org/en/stable

668号厅苏女士
668小苏专业吐槽副业试片 天天追新剧,偶尔补老番 感谢关注,喜欢就留下吧~
 最新文章