0 前言
推进掌握一门知识的方法除了温故知新
之外,还可以是触类旁通
. 近期沉寂停更的时间里,我在经历自学 c++
理论知识的入门阶段,不过大家都知道“纸上得来终觉浅”
的道理,因此我决定以复刻 golang gmp
协程调度设计思路为目标,基于c++11
的风格实现一个低配乞丐版协程调度框架
,以此作为我的首个 c++ 实践开源项目,并希望以此为契机,在提高 c++
编程熟练度的同时,也能提供一波旁支输入,反补提升我对gmp
概念的理解.
该项目我已于 github 开源,cbricks
是我基于 c++11
从零实现的基础工具开源库:https://github.com/xiaoxuxiansheng/cbricks.
其中实现的内容包括但不仅限于 协程调度框架 workerpool
、协程 coroutine/线程 thread
、并发通信队列 channel
、日志打印组件 logger
等基本工具类,而 协程调度框架 workerpool
正是我今天我要向大家介绍的主题.
这是我作为 c++ 初学者推进的首个开源项目,完全出于学习实践目的,难免存在水平不足
以及重复造轮
的问题,如有考虑不到位、不完善之处请多多包涵,也欢迎批评指正~
在开始正文前,
致敬环节
必不可少. 在实现 cbricks 的编程过程中,在很大程度上学习借鉴了sylar-yin 老师
的课程,在此特别感谢,也附上其开源项目传送门供大家参考使用:https://github.com/sylar-yin/sylar . 正因为有前辈们慷慨无私的倾囊分享,我的学习之路才得以更加平坦顺畅. 正是以上种种鼓励着我能有动力把技术分享以及这种开源精神继续传播下去.
1 基本概念
首先,需要大家一起理清楚有关协程
的基本概念.
1.1 线程与协程
我们通常所熟知的最小调度单元称为线程(thread)
,亦指内核级线程
,其创建、销毁、调度过程需要交由操作系统内核
负责. 线程
与进程
可以是多对一关系,可以充分利用 CPU 多核
的能力,提高程序运行的并发度
.
而协程(coroutine)
又称为用户级线程
,是在用户态
视角下对线程概念的二次封装
. 一方面,协程与线程关系为多对一,因此在逻辑意义
上属于更细粒度的调度单元
;另一方面,因为协程的创建、销毁、调度
行为都在用户态
中完成,而无需内核参与,因此协程是一个更加轻量化
的概念. (对于内核来说,其最小调度单元始终为线程不变,至于用户态下对线程又作了怎样的逻辑拆分,对于内核而言是完全透明无感知的,因此无需介入)
1.2 coroutine 与 goroutine
因为我毕竟有着较长的 golang 开发使用经验,需要在探讨相关问题的时候是无法绕开对 golang 中对 goroutine
这一设计的对比与探讨的.
我们把常规的协程称为 coroutine
. 而在 golang
语言层面天然支持一种优化版协程
模型,称为 goroutine
,并运转调度于 go 语言中知名的 gmp(goroutine-machine-processor)
架构之下.
有关 gmp
相关内容更细致的讲解,可以参见我之前分享的文章:golang gmp 原理
在经历了 cbricks workerpool
的开发实践后,我也对 gmp
架构下 groutine
相较于普通 coroutine
存在的优势有了一些更深刻的体会:
• 线程松耦合:经由
P
的从中斡旋,goroutine
能够跨 M(thread)调度运行
,真正实现 G 与 M 之间的松耦合. 而这一点我所实现的 c++ coroutine 中无法做到.(本文实现的 coroutine 底层依赖于 c 中的ucontext
库完成协程栈空间的分配
,由于栈的线程私有性
,一经分配便不允许被其他线程染指,因此 coroutine 在初始化后就必然是某个 thread 所独有的)• 栈空间自适应扩缩:
goroutine 栈空间
大小可以根据实际需要做到自适应扩缩
,并且针对使用方完全屏蔽这一细节. 而我所实现的c++ coroutine
需要在其初始化
时就显式设定好栈空间大小
,并且在运行过程中不便于修改.
• 阻塞粒度适配:这一点非常重要. golang 为使用方屏蔽了线程的概念,所有
并发操作
都基于goroutine 粒度
完成,这不仅限于调度,也包括与之相应的一系列并发阻塞工具
,例如锁 mutex
,通道 channel
等,都在语言层面天然支持goroutine 粒度的被动阻塞(go_park)
操作,与 gmp 体系完美适配;而这一点在 c++ 中则有所不同,如锁 mutex
、信号量 semaphore
等工具的最小阻塞粒度
都是线程
,这就会导致协程的优势遭到削弱,因为在一个 coroutine 中的阻塞行为最终会上升到 thread 粒度,并进而导致 thread 下其他 coroutine 也无法得到正常调度.
2 快速上手
做完基本概念铺垫后,下面我们开始介绍有关协程调度框架 cbricks workerpool
的具体实现内容.
2.1 使用方法
本章我们聚焦在如何快速上手使用 workerpool
这一问题. workerpool
类型声明于 ./pool/workerpool.h 头文件中,使用方通常只需关心其构造函数
和两个公开方法
:
• 构造函数——WorkerPool:初始化
workerpool
实例,其中唯一入参threads
为需要启用的线程个数
,默认为 8 个• 公开方法——submit:往
workerpool
中投递一个任务 task
(以void() 闭包函数
的形式)• 公开方法——sched:
主动让渡
当前task
的执行权,以实现同线程
下协程间的切换
// 命名空间 cbricks::pool
namespace cbricks{namespace pool{
// 协程调度池
classWorkerPool: base::Noncopyable{
public:
// 构造函数 threads——使用的线程个数. 默认为 8 个
WorkerPool(size_t threads =8);
// ...
public:
/**
公开方法
*/
/**
* submit: 向协程调度池中提交一个任务 (仿 golang 协程池 ants 风格)
- task 会被随机分配到线程手中,保证负载均衡
- task 被一个线程取到之后,会创建对应协程实例,为其分配本地栈,此时该任务和协程就固定属于一个线程了
* param:task——提交的任务 nonblock——是否为阻塞模式
- 阻塞模式:线程本地任务队列满时阻塞等待
- 非阻塞模式:线程本地队列满时直接返回 false
* response:true——提交成功 false——提交失败
*/
bool submit(task task, bool nonblock = false);
// 工作协程调度任务过程中,可以通过执行次方法主动让出线程的调度权 (仿 golang runtime.Goched 风格)
void sched();
}}
2.2 使用示例
下面是关于 workerpool
的具体使用示例,其中演示了如何完成一个 workerpool
的初始化,并通过 submit
方法向其中批量投递异步执行的任务
,最后对执行结果进行验收:
#include <iostream>
#include "sync/sem.h"
#include "pool/workerpool.h"
void testWorkerPool();
int main(int argc, char** argv){
// 测试函数
testWorkerPool();
}
void testWorkerPool(){
// 协程调度框架类型别名定义
typedef cbricks::pool::WorkerPool workerPool;
// 信号量类型别名定义
typedef cbricks::sync::Semaphore semaphore;
// 初始化协程调度框架,设置并发的 threads 数量为 8
workerPool::ptr workerPoolPtr(new workerPool(8));
// 初始化一个原子计数器
std::atomic<int> cnt{0};
// 初始化一个信号量实例
semaphore sem;
// 投递 10000 个异步任务到协程调度框架中,执行逻辑就是对 cnt 加 1
for(int i =0; i <10000; i++){
// 执行 submit 方法,将任务提交到协程调度框架中
workerPoolPtr->submit([&cnt,&sem](){
cnt++;
sem.notify();
});
}
// 通过信号量等待 10000 个异步任务执行完成
for(int i =0; i <10000; i++){
sem.wait();
}
// 输出 cnt 结果(预期结果为 10000)
std::cout << cnt << std::endl;
}
3 架构设计
了解完使用方式后,随后就来揭晓其底层实现原理
. 本着由总到分
的学习指导纲领,本章我们从全局视角纵览 workerpool 的设计实现架构
.
3.1 整体架构与核心概念
workerpool
自下而上,由粗到细可以分为如下层级概念:
• 线程池 threadPool:
workerpool
初始化时就启动指定数量
的常驻线程 thread 实例
. 这些 thread 数量固定不变,并且会持续运行,直到整个 workerpool 被析构为止. 由这些 thread 组成的集合,我们称为线程池 threadPool
.• 线程 thread:持续运营的
thread 单元
,不断执行着调度逻辑,依次尝试从本地任务队列 taskq
、本地协程队列 sched_q
中获取任务 task
/协程 coroutine
进行调度. 如果前两者都空闲,则 thread 会仿照 gmp 中的workstealing
机制,从其他 thread 的 taskq 中窃取 task 过来执行. 最后 steal 后仍缺少 task 供执行调度,则会利用channel
的机制,使thread
陷入阻塞,从而让出 cpu 执行权• 任务 task:用户提交的
异步任务
(对应为void() 闭包函数
类型). task 会被均匀分配
到特定 thread 的 taskq
中,但还存在被其他 thread 窃取的可能性,因此 task 本质上还是能够跨 thread 传递使用的• 协程 coroutine:在 workerpool 中,thread 不会直接执行 task,而是会为 task
一对一
构建出coroutine 实例
,并切换至 coroutine 中完成对 task 的执行. coroutine 被创建出来后,会完成栈 stack 的初始化和分配
,随后 coroutine 就固定属于一个 thread 了,终生不可再被其他 thread 染指• 线程本地任务队列 taskq:每个
thread 私有
的缓存 task 的队列
,底层由并发安全
的通信队列channel
实现. 当一笔 task 被投递到 workerpool 时,会基于负载均衡
策略投递到特定 thread 的 taskq
中,接下来会被该 thread 优先调度执行• 线程本地协程队列 schedq:每个
thread 私有
的缓存 coroutine 的队列
,底层由普通队列queue
实现,但属于线程本地变量thread_local
,因此也是并发安全
的. 当一个coroutine
因主动让渡 sched
操作而暂停执行
时,会将其暂存
到schedq
中,等待后续再找时机完成该 coroutine 的调度工作.
3.2 相比 gmp 的不足之处
我在实现 workerpool
时,一定程度上仿照了 gmp
的风格,包括 thread 本地任务队列 taskq
的实现以及 workstealing 机制
的设计.
然而受限于我的个人水平以及语言层面的风格差异,相较于 gmp
,workerpool
还存在几个明显的缺陷:
• coroutine 与 thread 强绑定:当一个 coroutine 被初始化时,我使用的是 c 语言中
ucontext.h
完成stack 的分配
,这样 coroutine stack 就是thread 私有
的,因此 coroutine 不能做到跨 thread 调度.• thread 级阻塞粒度:
c++
中,并发工具因此的阻塞行为
都是以 thread 为单位
. 以互斥锁 lock 为例,哪怕触发加锁阻塞行为的对象是 coroutine,但最终还是会引起整个 thread 对象陷入阻塞,从而导致 thread 下的其他已分配好的 coroutine 也无法得到执行.要解决这一问题,就必须连带着对 lock、cond、semaphore 等工具进行改造,使得其能够支持 coroutine 粒度的阻塞操作,这样的成本无疑很高,本项目未予以实践.
4 头文件源码
从第 4 章开始,我们正式进入源码解析
环节. 首先给出关于 workerpool 头文件
的完整代码展示,包含其中的成员属性
以及公私方法定义
. 下面的示意图以及源码中给出的注释相对比较完备,在此不再赘述:
代码位于 ./pool/workerpool.h
:
// 保证头文件内容不被重复编译
#pragma once
/**
依赖的标准库头文件
*/
// 标准库智能指针相关
#include <memory>
// 标准库函数编程相关
#include <functional>
// 标准库原子量相关
#include <atomic>
// 标准库——动态数组,以此作为线程池的载体
#include <vector>
/**
依赖的项目内部头文件
*/
// 线程 thread 实现
#include "../sync/thread.h"
// 协程 coroutine 实现
#include "../sync/coroutine.h"
// 阻塞队列 channel 实现 (一定程度上仿 golang channel 风格)
#include "../sync/channel.h"
// 信号量 semaphore 实现
#include "../sync/sem.h"
// 拷贝禁用工具,用于保证类实例无法被值拷贝和值传递
#include "../base/nocopy.h"
// 命名空间 cbricks::pool
namespace cbricks{namespace pool{
// 协程调度池 继承 Noncopyable 保证禁用值拷贝和值传递功能
classWorkerPool: base::Noncopyable{
public:
// 协程池共享指针类型别名
typedef std::shared_ptr<WorkerPool> ptr;
// 一笔需要执行的任务
typedef std::function<void()> task;
// 一个线程持有的本地任务队列
typedef sync::Channel<task> localq;
// 本地任务队列指针别名
typedef localq::ptr localqPtr;
// 线程指针别名
typedef sync::Thread* threadPtr;
// 一个分配了运行任务的协程
typedef sync::Coroutine worker;
// 协程智能指针别名
typedef sync::Coroutine::ptr workerPtr;
// 读写锁别名
typedef sync::RWLock rwlock;
// 信号量类型别名
typedef sync::Semaphore semaphore;
public:
/**
构造/析构函数
*/
// 构造函数 threads——使用的线程个数. 默认为 8 个
WorkerPool(size_t threads =8);
// 析构函数
~WorkerPool();
public:
/**
公开方法
*/
/**
* submit: 向协程调度池中提交一个任务 (仿 golang 协程池 ants 风格)
- task 会被随机分配到线程手中,保证负载均衡
- task 被一个线程取到之后,会创建对应协程实例,为其分配本地栈,此时该任务和协程就固定属于一个线程了
* param:task——提交的任务 nonblock——是否为阻塞模式
- 阻塞模式:线程本地任务队列满时阻塞等待
- 非阻塞模式:线程本地队列满时直接返回 false
* response:true——提交成功 false——提交失败
*/
bool submit(task task, bool nonblock = false);
// 工作协程调度任务过程中,可以通过执行次方法主动让出线程的调度权 (仿 golang runtime.Goched 风格)
void sched();
private:
/**
* thread——workerPool 中封装的线程类
* - index:线程在线程池中的 index
* - thr:真正的线程实例,类型为 sync/thread.h 中的 Thread
* - taskq:线程的本地任务队列,其中数据类型为闭包函数 void()
* - lock:一把线程实例粒度的读写锁. 用于隔离 submit 操作和 workstealing 操作,避免因任务队列阻塞导致死锁
*/
structthread{
typedef std::shared_ptr<thread> ptr;
int index;
threadPtr thr;
localqPtr taskq;
rwlock lock;
/**
* 构造函数
* param: index: 线程在线程池中的 index; thr: 底层真正的线程实例; taskq:线程持有的本地任务队列
*/
thread(int index,threadPtr thr, localqPtr taskq):index(index),thr(thr),taskq(taskq){}
~thread()=default;
};
private:
/**
私有方法
*/
// work:线程运行主函数,持续不断地从本地任务队列 taskq 或本地协程队列 t_schedq 中获取任务/协程进行调度. 倘若本地任务为空,会尝试从其他线程本地任务队列窃取任务执行
void work();
/**
* readAndGo:从指定的任务队列中获取任务并执行
* param:taskq——指定的任务队列 nonblock——是否为阻塞模式
* reponse:true——成功 false——失败
*/
bool readAndGo(localqPtr taskq, bool nonblock);
/**
* goTask: 为一笔任务创建一个协程实例,并调度该任务函数
* param: cb——待执行任务
* tip:如果该任务未一次性执行完成(途中使用了 sched 方法),则会在栈中封存好任务的执行信息,然后将该协程实例追加到线程本地的协程队列 t_schedq 中,等待后续再被线程调度
*/
void goTask(task cb);
/**
* goWorker:调度某个协程实例,其中已经分配好执行的任务函数
* param: worker——分配好执行任务函数的协程实例
* tip:如果该任务未一次性执行完成(途中使用了 sched 方法),则会在栈中封存好任务的执行信息,然后将该协程实例追加到线程本地的协程队列 t_schedq 中,等待后续再被线程调度
*/
void goWorker(workerPtr worker);
/**
* workStealing:当其他线程任务队列 taskq 中窃取半数任务填充到本地队列
*/
void workStealing();
/**
* workStealing 重载:从线程 stealFrom 的任务队列中窃取半数任务填充到线程 stealTo 本地队列
*/
void workStealing(thread::ptr stealTo, thread::ptr stealFrom);
/**
* getStealingTarget:随机获取一个线程作为窃取目标
*/
thread::ptr getStealingTarget();
/**
* getThreadByThreadName 通过线程名称获取对应的线程实例
*/
thread::ptr getThreadByThreadName(std::string threadName);
/**
* getThread 获取当前线程实例
*/
thread::ptr getThread();
private:
/**
* 静态私有方法
*/
// getThreadNameByIndex:通过线程 index 映射得到线程名称
static const std::string getThreadNameByIndex(int index);
// getThreadIndex:获取当前线程的 index
static const int getThreadIndex();
// getThreadName:获取当前线程的名称
static const std::string getThreadName();
private:
/**
* 私有成员属性
*/
// 基于 vector 实现的线程池,元素类型为 WorkerPool::thread 对应共享指针
std::vector<thread::ptr> m_threadPool;
// 基于原子变量标识 workerPool 是否已关闭
std::atomic<bool> m_closed{false};
};
}}
5 核心实现源码
接下来针对 workerpool
中的核心流程进行详细的源码走读,有关 workerpool
具体实现代码位于 ./pool/workerpool.cpp
中.
5.1 依赖的头文件与变量
首先涉及到两个核心变量的定义:
• 全局变量 s_taskId:
全局单调递增
的原子计数器
,为每个到来的 task 分配全局唯一 task id
,并依据此 id 明确 task 应该指派给哪个 thread• 线程本地变量(thread_local) t_schedq:
线程私有
的协程队列
. 运行过程因主动让渡
而暂停的 coroutine,会被暂存
到其中,等待后续被相同的 thread
继续调度执行.
// 标准库队列实现. 依赖队列作为线程本地协程队列的存储载体
#include <queue>
// workerpool 头文件
#include "workerpool.h"
// 本项目定义的断言头文件
#include "../trace/assert.h"
// namespace cbricks::pool
namespace cbricks{namespace pool{
/**
* 全局变量 s_taskId:用于分配任务 id 的全局递增计数器,通过原子变量保证并发安全
* 每个任务函数会根据分配到的 id,被均匀地分发给各个线程,以此实现负载均衡
*/
static std::atomic<int> s_taskId{0};
/**
* 线程本地变量 t_schedq:线程私有的协程队列
* 当线程下某个协程没有一次性将任务执行完成时(任务调用了 sched 让渡函数),则该协程会被暂存于此队列中,等待后续被相同的线程继续调度
*/
staticthread_local std::queue<WorkerPool::workerPtr> t_schedq;
// ...
}}
5.2 构造函数与析构函数
下面介绍workerpool
的构造函数
,其任务很明确,就是初始化
好指定数量的 thread
,为其分配好对应的 taskq
,并将 thread 一一投递
进入到线程池 threadPool
中.
此处值得一提的是,thread 启动后异步运行的方法是 WorkerPool::work,其中会涉及到从 threadPool 中取出当前 thread 实例的操作,因此这里需要通过信号量 semaphore 保证 thread 实例先被投递进入 threadPool 后,对应 WorkerPool::work 方法才能被放行.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* workerpool 构造函数:
* - 初始化好各个线程实例 thread
* - 将各 thread 添加到线程池 m_threadPool 中
*/
WorkerPool::WorkerPool(size_t threads){
CBRICKS_ASSERT(threads >0,"worker pool init with nonpositive threads num");
// 为线程池预留好对应的容量
this->m_threadPool.reserve(threads);
/**
* 构造好对应于每个 thread 的信号量
* 这是为了保证 thread 实例先被添加进入 m_threadPool,thread 中的调度函数才能往下执行
* 这样做是因为 thread 调度函数有依赖于从 m_threadPool 获取自身实例的操作
*/
std::vector<semaphore> sems(threads);
// 另一个信号量,用于保证所有 thread 调度函数都正常启动后,当前构造函数才能退出,避免 sems 被提前析构
semaphore waitGroup;
// 初始化好对应数量的 thread 实例并添加进入 m_threadPool
for(int i =0; i < threads; i++){
// 根据 index 映射得到 thread 名称
std::string threadName =WorkerPool::getThreadNameByIndex(i);
// 将 thread 实例添加进入 m_threadPool
this->m_threadPool.push_back(thread::ptr(
// thread 实例初始化
newthread(
i,
//
new sync::Thread([this,&sems,&waitGroup](){
/**
* 此处 wait 操作是需要等待对应 thread 实例先被推送进入 m_threadPool
* 因为一旦后续的 work 函数运行,就会涉及从 m_threadPool 中获取 thread 实例的操作
* 因此先后顺序不能颠倒
*/
sems[getThreadIndex()].wait();
/**
* 此处 notify 操作是配合外层的 waitGroup.wait 操作
* 保证所有 thread 都正常启动后,workerPool 构造函数才能退出
* 这是为了防止 sems 被提前析构
*/
waitGroup.notify();
// 异步启动的 thread,最终运行的调度函数是 workerpool::work
this->work();
},
// 注入 thread 名称,与 index 有映射关系
threadName),
// 分配给 thread 的本地任务队列
localqPtr(new localq))));
/**
* 在 thread 实例被推送入 m_threadPool 后进行 notify
* 这样 thread 调度函数才会被向下放行
*/
sems[i].notify();
}
/**
* 等待所有 thread 实例正常启动后,构造函数再退出
*/
for(int i =0; i < threads; i++){
waitGroup.wait();
}
}
在析构函数
中,要做的处理是将 workerpool 关闭标识 m_closed
置为 true,并且一一关闭
所有 thread 下的 taskq
,这样运行中的 thread
在感知到这一信息后都会主动退出
.
// 析构函数
WorkerPool::~WorkerPool(){
// 将 workpool 的关闭标识置为 true,后续运行中的线程感知到此标识后会主动退出
this->m_closed.store(true);
// 等待所有线程都退出后,再退出 workpool 的析构函数
for(int i =0; i <this->m_threadPool.size(); i++){
// 关闭各 thread 的本地任务队列
this->m_threadPool[i]->taskq->close();
// 等待各 thread 退出
this->m_threadPool[i]->thr->join();
}
}
// ...
}}
5.3 公有方法:提交任务
用户通过 submit
方法,能够将 task 提交
到 workerpool
中. 在 submit 流程中:
• 首先,为 task 分配
全局唯一
的taskId
.• 然后,对
threadPool 长度取模
后,找到 task从属的 thread
.• 接下来,将
task 投递
到该 thread 的taskq
中即可.
这里需要注意的是,在投递任务到 thread 的 taskq 前,需要先加上该 thread 的读锁 readlock. 这是为了和该 thread 下可能正在执行的 workStealing 操作进行互斥,避免因 taskq 空间不足而导致死锁问题. 这个点在窃取流程的讲解中详细展开.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* submit: 提交一个任务到协程调度池中,任务以闭包函数 void() 的形式组装
* - 为任务分配全局递增且唯一的 taskId
* - 根据 taskId 将任务均匀分发给指定 thread
* - 将任务写入到指定 thread 的本地任务队列中
*/
bool WorkerPool::submit(task task, bool nonblock){
// 若 workerpool 已关闭,则提交失败
if(this->m_closed.load()){
returnfalse;
}
// 基于任务 id 对 m_threadPool 长度取模,将任务映射到指定 thread
int targetThreadId =(s_taskId++)%(this->m_threadPool.size());
thread::ptr targetThr =this->m_threadPool[targetThreadId];
// 针对目标 thread 加读锁,这是为了防止和目标 thread 的 workstealing 操作并发最终因任务队列 taskq 容量溢出而导致死锁
rwlock::readLockGuard guard(targetThr->lock);
// 往对应 thread 的本地任务队列中写入任务
return targetThr->taskq->write(task, nonblock);
}
// ...
}}
5.4 公有方法:让渡执行权
task 在运行过程中,可以通过调用 workerpool::sched
方法完成执行权的主动让渡
. 此时 task
对应 coroutine
会暂停运行
,并将执行权切换回到 thread 主函数
中,然后 thread 会将该 coroutine 暂存
到本地协程队列 schedq
中,等待后续再对其调度执行.
// namespace cbricks::pool
namespace cbricks{ namespace pool{
// ...
// sched:让渡函数. 在任务执行过程中,可以通过该方法主动让出线程的执行权,则此时任务所属的协程会被添加到 thread 的本地协程队列 t_schedq 中,等待后续再被调度执行
void WorkerPool::sched(){
worker::GetThis()->sched();
}
// ...
}}
5.5 线程调度任务主流程
workerpool::work
方法是各 thread
循环运行的主函数
,其中包含了 thread 调度 task 和 coroutine
的核心逻辑:
• 调度优先级一:从 thread 的本地任务队列
taskq
中获取 task 并调度执行• 调度优先级二:当
taskq 为空
或者连续获取 10 次 taskq
后(为避免 schedq 产生饥饿),会主动获取一次本地协程队列 schedq
中的 coroutine 进行调度• 调度优先级三:如果
taskq
和schedq
都是空的,则进入workstealing
流程,尝试从其他 thread taskq
中窃取半数 taskq
填充到当前 thread taskq 中• 必要性阻塞:如果经历完上述流程,仍没有合适的目标供 thread 调度,则 thread 会依赖
channel 的阻塞消费能力陷入阻塞
,从而让出 cpu 执行权
,避免资源浪费
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* work: 线程运行的主函数
* 1) 获取需要调度的协程(下述任意步骤执行成功,则跳到步骤 2))
* - 从本地任务队列 taskq 中取任务,获取成功则为之初始化协程实例
* - 从本地协程队列 schedq 中取协程
* - 从其他线程的任务队列 taskq 中偷取一半任务到本地任务队列
* 2) 调度协程执行任务
* 3) 针对主动让渡而退出的协程,添加到本地协程队列
* 4) 循环上述流程
*/
void WorkerPool::work(){
// 获取到当前 thread 对应的本地任务队列 taskq
localqPtr taskq =this->getThread()->taskq;
// main loop
while(true){
// 如果 workerpool 已关闭 则主动退出
if(this->m_closed.load()){
return;
}
/**
* 执行优先级为 本地任务队列 taskq -> 本地协程队列 t_t_schedq -> 窃取其他线程任务队列 other_taskq
* 为防止饥饿,至多调度 10 次的 taskq 后,必须尝试处理一次 t_schedq
*/
// 标识本地任务队列 taskq 是否为空
bool taskqEmpty =false;
// 至多调度 10 次本地任务队列 taskq
for(int i =0; i <10; i++){
// 以【非阻塞模式】从 taskq 获取任务并为之分配协程实例和调度执行
if(!this->readAndGo(taskq,false)){
// 如果 taskq 为空,将 taskqEmpty 置为 true 并直接退出循环
taskqEmpty =true;
break;
}
}
// 尝试从线程本地的协程队列 t_schedq 中获取协程并进行调度
if(!t_schedq.empty()){
// 从协程队列中取出头部的协程实例
workerPtr worker = t_schedq.front();
t_schedq.pop();
// 进行协程调度
this->goWorker(worker);
// 处理完成后直接进入下一轮循环
continue;
}
// 如果未发现 taskq 为空,则无需 workstealing,直接进入下一轮循环
if(!taskqEmpty){
continue;
}
/**
* 走到这里意味着 taskq 和 schedq 都是空的,则要尝试发起窃取操作
* 随机选择一个目标线程窃取半数任务添加到本地队列中
*/
this->workStealing();
/**
* 以【阻塞模式】尝试从本地任务获取任务并调度执行
* 若此时仍没有可调度的任务,则当前 thread 陷入阻塞,让出 cpu 执行权
* 直到有新任务分配给当前 thread 时,thread 才会被唤醒
*/
this->readAndGo(taskq,true);
}
}
// ...
}}
以 readAndGo
方法为入口,thread 会尝试从 taskq
中获取一笔 task;获取到后,会为 task 构建一一对应的 coroutine 实例
(至此 task/coroutine 与 thread 完全绑定),然后通过 coroutine::go
方法,将 thread 执行权切换至 coroutine 手中,由 coroutine 执行其中的 task
. 只有在 task 执行结束
或者主动让渡
时,执行权
才会返还到 thread 主函数
中,此时 thread 会判断 coroutine 是否是因为主动让渡而暂停执行,如果是的话,则会将该 coroutine 实例追加到 schedq 中,等待后续寻找合适时机再作调度执行.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
/**
* readAndGo:
* - 从指定任务队列中获取一个任务
* - 为之分配协程实例并调度执行
* - 若协程实例未一次性执行完成(执行了让渡 sched),则将协程添加到线程本地的协程队列 schedq 中
* param:taskq——任务队列;nonblock——是否以非阻塞模式从任务队列中获取任务
* response:true——成功;false,失败(任务队列 taskq 为空)
*/
// 将一个任务包装成协程并进行调度. 如果没有一次性调度完成,则将协程实例添加到线程本地的协程队列 t_schedq
bool WorkerPool::readAndGo(cbricks::pool::WorkerPool::localqPtr taskq, bool nonblock){
// 任务容器
task cb;
// 从 taskq 中获取任务
if(!taskq->read(cb,nonblock)){
returnfalse;
}
// 对任务进行调度
this->goTask(cb);
returntrue;
}
/**
* goTask
* - 为指定任务分配协程实例
* - 执行协程
* - 若协程实例未一次性执行完成(执行了让渡 sched),则将协程添加到线程本地的协程队列 schedq 中
* param:cb——待执行的任务
*/
void WorkerPool::goTask(task cb){
// 初始化协程实例
workerPtr _worker(newworker(cb));
// 调度协程
this->goWorker(_worker);
}
/**
* goWorker
* - 执行协程
* - 若协程实例未一次性执行完成(执行了让渡 sched),则将协程添加到线程本地的协程队列 schedq 中
* param:worker——待运行的协程
*/
void WorkerPool::goWorker(workerPtr worker){
// 调度协程,此时线程的执行权会切换进入到协程对应的方法栈中
worker->go();
// 走到此处意味着线程执行权已经从协程切换回来
// 如果此时协程并非已完成的状态,则需要将其添加到线程本地的协程队列 schedq 中,等待后续继续调度
if(worker->getState()!= sync::Coroutine::Dead){
t_schedq.push(worker);
}
}
// ...
}}
5.6 任务窃取流程
当 thread 发现 taskq 和 schedq 都空闲
时,则会尝试执行窃取
操作. 此时 thread 随机选取
另一个 thread 作为窃取目标,窃取
其 taskq 中的半数 task
,追加到本地 taskq 中.
在执行窃取操作的过程中,需要对当前 thread 加写锁,以避免发生死锁问题:
比如在窃取前,当前 thread 判定自己的 taskq 还有足够空间用于承载窃取来的 task;但是此期间若有新的任务 submit 到来,则可能把 taskq 的空间占据,最后导致没有足够容量承载窃取到的 task,最终导致 thread 调度流程 hang 死在 workstealing 流程无法退出.
上述问题的解法就是,在窃取前,先加 thread 写锁(这样并发到来的 submit 操作就无法完成 task 投递)然后再检查一遍 taskq 并确认容量充足后,再发起实际的窃取操作.
// namespace cbricks::pool
namespace cbricks{namespace pool{
// ...
// 从某个 thread 中窃取一半任务给到本 thread 的 taskq
void WorkerPool::workStealing(){
// 选择一个窃取的目标 thread
thread::ptr stealFrom =this->getStealingTarget();
if(!stealFrom){
return;
}
// 从目标 thread 中窃取半数任务添加到本 thread taskq 中
this->workStealing(this->getThread(),stealFrom);
}
// 从 thread:stealFrom 中窃取半数任务给到 thread:stealTo
void WorkerPool::workStealing(thread::ptr stealTo, thread::ptr stealFrom){
// 确定窃取任务数量:目标本地任务队列 taskq 中任务总数的一半
int stealNum = stealFrom->taskq->size()/2;
if(stealNum <=0){
return;
}
// 针对 thread:stealTo 加写锁,防止因 workstealing 和 submit 行为并发,导致线程因 taskq 容量溢出而发生死锁
rwlock::lockGuard guard(stealTo->lock);
// 检查此时 stealTo 中的 taskq 如果剩余容量已不足以承载拟窃取的任务量,则直接退出
if(stealTo->taskq->size()+ stealNum > stealTo->taskq->cap()){
return;
}
// 创建任务容器,以非阻塞模式从 stealFrom 的 taskq 中窃取指定数量的任务
std::vector<task> containers(stealNum);
if(!stealFrom->taskq->readN(containers,true)){
return;
}
// 将窃取到的任务添加到 stealTo 的 taskq 中
stealTo->taskq->writeN(containers,false);
}
// 随机选择本 thread 外的一个 thread 作为窃取的目标
WorkerPool::thread::ptr WorkerPool::getStealingTarget(){
// 如果线程池长度不足 2,直接返回
if(this->m_threadPool.size()<2){
returnnullptr;
}
// 通过随机数,获取本 thread 之外的一个目标 thread index
int threadIndex =WorkerPool::getThreadIndex();
int targetIndex =rand()%this->m_threadPool.size();
while( targetIndex == threadIndex){
targetIndex =rand()%this->m_threadPool.size();
}
// 返回目标 thread
returnthis->m_threadPool[targetIndex];
}
// ...
}}
6 总结
祝贺,至此本文结束. 本篇和大家探讨了,如何基于 c++ 从零到一实现一个协程调度框架,其核心功能包括:
• 创建指定数量线程持续复用,调度后续到来的任务
• 以闭包函数的风格提交任务到框架中,由异步协程完成执行
• 任务运行过程中支持通过主动让渡操作让出调度执行权
• 支持线程间的任务窃取操作,使得各调度线程间忙闲有度、负载均衡