bthread用户接口
bthread_t(类似pthread_t):64位int,前32位版本号防止aba问题(释放后又重新被分配了,歧义),后32位为资源池(无全局竞争可o(1)访问的数组)中下标,可以很快地找到taskmeta(里面有个变量就是这个bth的栈)
起bth基本函数
bthread_start_urgent & bthread_start_background 里面都是先判断g是否为空(是否运行在worker里),是的话运行start_xxx,不是运行start_from_non_worker(直接调用start_background
- start_foreground:如果当前在bthread内,就在当前worker内原地启动(调ready_to_run或ready_to_run_remote)保证locality,把当前bth加到_rq队尾,再去跑新bthread,实现:set_remained(ready_to_run(current_bth)) sched_to(new_bth)
- start_background:后台起,但是不希望立刻跑这个bthread,将来有时间跑就行,直接调ready_to_run把新bth加到_rq队尾,如果是在start_from_non_worker里起就加到_remote_rq,实现:ready_to_run[_remote](new_bth)
// 让出当前worker立即执行新bthread,当前bthread随后调度
int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) __throw {
bthread::taskgroup* g = bthread::tls_task_group;
if (g) {
// 从worker里起的bthread,调ready_to_run,再主动调一遍sched_to去执行这个tid
return bthread::taskgroup::start_foreground(&g, tid, attr, fn, arg);
}
// 一些初始化的工作,最后会创建1个tc、4个pl、9个pthread、9个tg
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
// 放到队列里,不急着切线程(sched_to)
int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) __throw {
bthread::taskgroup* g = bthread::tls_task_group;
if (g) {
// 从worker里起的bthread,调ready_to_run
return g->start_background(tid, attr, fn, arg);
}
// 同上
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
// 核心函数:创建taskgroup,见下
taskcontrol* c = get_or_new_task_control();
...
// 选择一个taskgroup执行start_background,调ready_to_run_remote
return c->choose_one_group()->start_background(
tid, attr, fn, arg);
}
inline taskcontrol* get_or_new_task_control() {
// 全局变量tc(g_task_control)初始化原子变量
butil::atomic* p = (butil::atomic*)&g_task_control;
// 通过原子变量进行load,取出tc指针,如果不为空,直接返回
taskcontrol* c = p->load(butil::memory_order_consume);
if (c != null) {
return c;
}
...
// 走到这,说明tc确实为null,开始new一个
c = new (std::nothrow) taskcontrol;
if (null == c) {
return null;
}
// 用并发度concurrency来初始化全局tc
int concurrency = flags_bthread_min_concurrency > 0 ?
flags_bthread_min_concurrency :
flags_bthread_concurrency;
// init函数核心见下
if (c->init(concurrency) != 0) {
log(error) << "fail to init g_task_control";
delete c;
return null;
}
// 4. 将全局tc存入原子变量中
p->store(c, butil::memory_order_release);
return c;
}
// taskcontrol的init函数
for (int i = 0; i < _concurrency; i) {
const int rc = pthread_create(&_workers[i], null, worker_thread, this);
if (rc) {
log(error) << "fail to create _workers[" << i << "], " << berror(rc);
return -1;
}
}
// worker_thread之前有过说明
【代码说明】若taskgroup为空,证明当前是跑在pthread上,调用start_from_non_worker检查是否已经创建了tackcontrol单例,已经创建就无所谓了,没有就new taskcontrol去创建,new后会执行tackcontrol的init,核心就是用pthread启动指定数量的worker。得到tackcontrol单例后,用tackcontrol选取一个tackgroup,新建bthread进行调度;taskgroup不为空,表明此bthread就是在worker内被创建的,在对应tackgroup(worker)执行新bthread的启动即可。
两个常见的阻塞操作
- yield:把当前的bth加到_rq队尾,执行调度函数选下一个,实现:set_remained(ready_to_run(current_bth)) sched
- usleep:涉及超时管理,先从当前队里摘掉,把当前bth加到_remote_rq里,当定时器到了,加到_rq里运行,实现:add timer(ready_to_run_remote(current_bth)) sched
基本流程
tc创建后会pthread_create 9个线程去初始化tg,避免全局变量加锁处理,效率低,也要避免惊群。一旦出现某一个bthread可以被偷了会唤醒很多worker,会发生惊群,要处理。
void* taskcontrol::worker_thread(void* arg) {
// 获取tc指针
taskcontrol* c = static_cast(arg);
// 创建一个task_group
taskgroup* g = c->create_group();
taskstatistics stat;
if (null == g) {
log(error) << "fail to create taskgroup in pthread=" << pthread_self();
return null;
}
// 重要变量,定义baidu_thread_local taskgroup* tls_task_group;
// tls_task_group是一个thread_local变量,且只有由taskcontrol启动的worker线程所拥有的task_group tls_task_group非null
tls_task_group = g;
// 计数 1
c->_nworkers << 1;
// 当前线程主要运行的任务,即死循环中等待唤醒
g->run_main_task();
// ...
// 销毁
tls_task_group = null;
g->destroy_self();
c->_nworkers << -1;
return null;
}
每个tg创建后会创建后当前线程的各种变量,然后去调 run_main_task 去循环等待是否有可处理的bthread。
void taskgroup::run_main_task() {
// ......
taskgroup* dummy = this;
bthread_t tid;
// 这个while就是卡住等bthread id,这个id可能是从队列里取出来的,也可能是偷到的
while (wait_task(&tid)) { // 1
// 这里的唤醒是用futex实现的,也就是利用一个原子变量判断是否需要去尝试加锁访问,然后在实际去加锁访问
// 走到这里就说明已经获取到一个可执行的bthread,那么调sched_to去切线程
taskgroup::sched_to(&dummy, tid); // 2
dcheck_eq(this, dummy);
dcheck_eq(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
// 只有当前线程不是调度bthread线程的时候才会去执行用户func
taskgroup::task_runner(1/*skip remained*/); // 3
}
// ......
}
// 1 每个worker调度线程卡在的位置
bool taskgroup::wait_task(bthread_t* tid) {
do {
// ...
_pl->wait(_last_pl_state);
// wait:内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞
// 当阻塞结束之后,执行steal_task()来work stealing,如果窃取成功则返回,不成功就继续wait
if (steal_task(tid)) {
// 顺序为先看本地_remote_rq再去其他worker偷
// 偷的顺序为先偷别_rq的再偷_remote_rq的
return true;
}
} while (true);
}
// 2 用内核调用实现上下文的切换
inline void taskgroup::sched_to(taskgroup** pg, bthread_t next_tid) {
// 根据tid找taskmeta
taskmeta* next_meta = address_meta(next_tid);
// 给stk用next_meta赋一些值
if (next_meta->stack == null) {
contextualstack* stk = get_stack(next_meta->stack_type(), task_runner);
// ...
}
// 重载的sched_to会判断next_meta和cur_meta是否为同一个不是同一个需要调jump_stack去切栈
// 这其中直接嵌入了汇编代码模拟pthread切栈操作
sched_to(pg, next_meta);
}
// 3 执行用户实际的func并找下一个
void taskgroup::task_runner() {
taskmeta* const m = g->_cur_meta;
// 执行应用程序设置的任务函数,在任务函数中可能yield让出cpu,也可能产生新的bthread
m->fn(m->arg);
// 任务函数执行完成后,需要唤起等待该任务函数执行结束的pthread/bthread
butex_wake_except(m->version_butex, 0);
// ending_sched函数本意:将pthread线程执行流转入下一个可执行的bthread(普通bthread或“调度bthread”)
// 在ending_sched()内会尝试从本地taskgroup的rq队列中找出下一个bthread
// 或者调steal_task从其他pthread的taskgroup上steal一个bthread(当然也是先看_remote_rq,再偷)
// 如果没有bthread可用则下一个被执行的就是pthread的“调度bthread”
// 然后通过sched_to()将pthread的执行流转入下一个bthread的任务函数
ending_sched(&g);
}
【场景】以proxy为例首先在mixer-framework里已经调用过了bthread_start_background初始化一些必要的东西(tc、tg等),然后在send_minibs_request_impl函数中用bthread_start_background则是相当于在bthread内启动了一个新的bthread,想要执行函数func,由于这个新bth不是在pthread内起的,所以肯定会被放在该taskgroup的_remote_rq中,然后,这个阻塞在wait_task上的taskgroup会被唤醒,进入以下步骤取出tid来:
- 首先从本taskgroup的_remote_rq队列中pop出一个tid(_remote_rq.pop(tid)),pop成功就直接返回
- 如果pop失败了,调用steal_task从其它taskgroup中去偷一个tid,偷的顺序在上面
无论如何拿到tid后,进入taskgroup::sched_to去切栈,具体栈跳跃步骤为:
1、调用jump_stack进入新的bthread栈,由于这时新的bthread还没有分配栈,会首先为它创建一个栈new_stack
2、新的bthread创建完后,利用bthread_jump_fcontext修改栈顶指针、各寄存器进入新栈new_stack,并记录跳转前的旧栈stack_main的相关内容
3、新栈的执行函数fn为taskgroup::task_runner(注意这个就是用户的func,层层函数指针传入),这个task_runner主要会干3个事儿
- 执行用户实际的回调函数func(用户想让bthread干的事)
- 唤起等待该任务函数执行结束的pthread/bthread
- ending_sched:尝试再pop出一个bthread来,如果现在没有新的bthread,就把_main_meta作为下一次的跳转对象(相当于返回去继续执行原来的代码),再次调用jump_stack由new_stack跳转入stack_main,相当于又拿到了一个tid继续运行
4、都运行完了继续下一次wait_task中的do while死循环,重新阻塞在_pl→wait(...)上