如何在r中进行并行运算(一)

文摘   2025-01-30 14:50   安徽  



Vol.1

使⽤bench包进⾏基准测试

library(bench)

x <- runif(n = 1000000)

bench::mark(

 sqrt(x),

 x ^ 0.5,

 x ^ (1 / 2),

 min_time = Inf, max_iterations = 1000

)


以下展⽰了三种计算平⽅根⽅法的性能评估。min和median分别代表每种⽅法的最⼩和中位数运⾏时间,可以看到sqrt的⽅法最快,其他两种更慢⼀点,且性能相近。itr代表每秒能够完成的迭代次数,mem_alloc代表每次运⾏的内存分配量,gc代表每秒触发垃圾回收的次

数。




Vol.2

parallel包介绍

parallel包是R中专门⽤于并⾏计算的包,可以利⽤多核处理器的能⼒来加速计算任务。

⼀些核心函数和用途

detectCores()

• 作⽤:检测系统中可⽤的核⼼数(CPU核数)。

• ⽤途:帮助⽤户根据硬件资源确定分配给并⾏任务的核⼼数。

• 注意:并⾮所有核⼼数都应该被占⽤,⼀般会留⾄少⼀个核⼼给系统。


pvec()

• 作⽤:⽤于对向量的映射函数进⾏并⾏化(基于Forking)。

• 参数: mc.cores 设置⽤于并⾏计算的核⼼数。

• ⽤途:适合处理简单的向量操作任务,例如对每个向量元素应⽤⼀个函数。


**mclapply()**

• 作⽤:是lapply()函数的并⾏版本(基于Forking)。

• 参数: mc.cores 设置⽤于并⾏计算的核⼼数。c.preschedule 是否预先分配任务到各核⼼(默认为TRUE,适合任务运⾏时间相近的情况)。 affinity.list 可以指定任务如何分配到特定核⼼(⽤于负载均衡)。

• ⽤途:适合复杂列表或数据框的逐项操作,例如对每⼀项应⽤特定函数。


**mcparallel() 和 mccollect()**

• mcparallel() —在单独的进程中异步评估R表达式(基于Forking)。

• mccollect() —收集由 mcparallel() 启动的并⾏进程的结果。适合需要⼿动管理进程的情况,适⽤于更⾼级别的并⾏任务。


硬件信息

• ⼀般笔记本电脑可能有8个核⼼。

• 硬件核⼼数决定了并⾏计算的上限,多核系统能显著提升性能。


核⼼选择:在开始并⾏计算前,使⽤ detectCores() 确定核⼼数,并合理分配给mc.cores。


适用场景:

• pvec() :向量操作。

• mclapply() :列表操作。

• mcparallel()和mccollect() :更灵活的任务管理。


Caution

Forking⽅法不⽀持Windows系统,如果在Windows上使⽤,需要考虑其他⽅法(例如parLapply()或snow包)。


负载均衡


sleepR <- function(x) {

 Sys.sleep(x) #让每个任务“暂停”x秒,模拟任务的计算时间。

 runif(1) #在暂停后返回⼀个随机数。

}


平衡负载计算

#输⼊向量和负载分配列表

x <- c(2.5, 2.5, 5) #三个任务的耗时分别为2.5秒、2.5秒和5秒

aff_list_bal <- c(1, 1, 2) #平衡负载分配:任务被分配给两个核⼼,核⼼1处理第⼀个和第⼆个任务;核⼼2处理第三个任务

aff_list_unbal <- c(1, 2, 2)#不平衡负载分配:任务分配不均,核⼼1处理第⼀个任务;核⼼2处理第⼆个和第三个任务


不平衡负载计算

平衡任务分配可以减少运⾏时间,提⾼并⾏效率。在平衡分配中,两个核⼼⼯作时间接近,效率最⾼。


Socket并⾏

detectCores()  # 检测可⽤的核⼼数

c1 <- makeCluster()  # 创建⼀个集群

result <- clusterApply(cl = c1, ...)  # 在集群中运⾏任务

stopCluster(c1)  # 关闭集群


*⽰例:

clust <- makeCluster(4) # 创建⼀个4核⼼的集群

library(nycflights13)  # 主进程中加载nycflights13包

clusterEvalQ(cl = clust, dim(flights)) # 在worker节点中执⾏

dim(flights)

stopCluster(clust)    # 停⽌并释放集群资源

*此时可能会出现这样的报错:

这是因为在Sockets并⾏化中,每个worker节点都是独⽴的R会话,它们不会⾃动继承主R会话中的数据、函数或加载的包。因此,虽然主会话加载了nycflights13包,但worker节点中没有加载这个包,也⽆法找到flights数据集。


解决⽅案1:

clust <- makeCluster(4)  # 创建⼀个4核⼼集群

clusterEvalQ(cl = clust, {

 library(nycflights13)  # 在每个worker节点加载nycflights13包

 dim(flights)      # 在worker节点上查看flights数据集的维度

})

stopCluster(clust)    # 停⽌并释放集群资源


解决⽅案2:

cl <- makeCluster(4)  # 创建⼀个4核⼼集群

library(nycflights13)  # 在主进程中加载nycflights13包

clusterExport(cl = cl, varlist = c("flights"))  # 将flights对象传递到worker节点

clusterEvalQ(cl = cl, {dim(flights)})  # 在worker节点中运⾏

dim(flights)

stopCluster(cl)  # 停⽌并释放集群资源

使⽤⽅案1:

• Worker节点需要访问特定包的内容(例如,直接加载包中的数据集)。

• 当⽆法提前将数据准备好时,这种⽅法更加灵活。


使⽤⽅案2:

• 数据已经在主进程中准备好,⽆需worker节点加载额外的包。

• 优先选择这种⽅法,减少worker节点的额外操作,提升效率。


最后输出的结果都是:


关于R中⼀系列并⾏化的apply函数的家族


负载均衡的意义:

• 普通版本(如parLapply()):将任务静态分配到不同的worker节点,如果任务耗

时不均,某些节点可能会空闲。

• 负载均衡版本(如parLapplyLB()):动态分配任务,确保每个节点尽可能保持忙

碌状态,提升资源利⽤率。


如何通过并⾏化实现bootstrap

#加载tidyverse,其中包括dplyr和ggplot2

library(tidyverse)

cl <- makeCluster(4)  # 创建⼀个4核⼼的集群

#定义bootstrap函数并⽣成样本

boot_samples <- clusterEvalQ(cl = cl, {

 library(dplyr)

 create_boot_sample <- function() {

  mtcars %>%

   select(mpg) %>%

   sample_n(size = nrow(mtcars), replace = TRUE)

}

 replicate(2500, create_boot_sample())

})


#并⾏化计算和结果可视化

map(boot_samples, ~parLapply(cl, X = ., fun = mean)) %>%

 unlist() %>%

 as_tibble() %>%

 ggplot(aes(x = value)) +

 geom_histogram() +

 theme_minimal(base_size = 16)

 stopCluster(cl)




PSYCH统计实验室

通知公告

网络分析课程目前开放视频课啦

单次课200元/讲(学生),250元/讲(非学生)

共有四讲内容:

①横断面网络分析简介与基础

②网络分析与因子分析

③交叉滞后网络分析

④时间序列网络分析

购买后开放视频权限14天,可多次申请。

并赠送所有课程相关资料(无PPT)

如果想申请购买,请联系M18812507626



更多资讯

关注我们


文稿:Traveler

排版:Peruere

审核:摘星

本文由“Psych统计自习室”课题组原创,欢迎转发至朋友圈。如需转载请联系后台,征得作者同意后方可转载。












Psych统计自习室
大家好,我们是由来自北京师范大学,西南大学,天津医科大学等高校在读硕士、博士研究生组成的一个科研团队——Psych统计自习室。Psych统计自习室旨在关注心理学、精神病学领域的最前沿的系列研究,并做前沿统计知识的分享。
 最新文章