首先我们先明确几个概念。何谓brpc,bthread,协程。
- brpc
百度内最常使用的工业级rpc框架,rpc把网络交互类比为“client端访问server端上的函数”:client向server发送request后开始等待,知道server收到请求后,处理、回复client,client又再度恢复并根据response做出反应。 - bthread
bthread是brpc开发的一套协程,可以通俗的理解为轻量级的线程。 - 协程
协程是单线程下的并发,又称微线程,纤程。它是实现多任务的另一种方式,只不过是比线程更小的执行单元。因为它自带cpu的上下文,这样只要在合适的时机,我们可以把一个协程切换到另一个协程。
正如标题所说,brpc的精华全部都在bthread上,而bthread就是我们brpc开发的一套“协程”。而进程,线程,和bthread的关系是什么样的呢?一个进程里面可以开辟多个线程,而线程和协程的关系呢。在微信开源的libco上,线程 :协程 = 1 :n。而在bthread上 线程 :协程 = m :n,而bthread实现的关键就是工作窃取算法。后续会展开描述。
bthread有三大件:
- taskcontrol(进程内唯一)
- taskgroup(线程内唯一)
- taskmeta(bthread上下文)
由于函数量巨大,所以我绘制了一张函数总体思路总结表,供大家方便理解。
我们从创建函数开始讲起,并且以展开的方式依次对一些我能理解的地方进行剖析。首先我们介绍的是bthread创建函数。
bthread创建函数
好的 下面我一一讲解这两个bthread创建函数。
首先我们会去尝试获取到bthread的tls_task_group,而我们查看tls_task_group的定义处可以看到
__thread taskgroup* tls_task_group = null;
那就证明如果没有tls_task_group的话则需要执行到return中的语句start_from_non_worker(),接下来我们查看到这个函数的定义。
//其一
int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::taskgroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return g->start_background(tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
//其二
int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::taskgroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return bthread::taskgroup::start_foreground(&g, tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
start_from_non_worker
这个函数一开始,我们会进行对taskcontrol的获取或者新建taskcontrol,也就是函数get_or_new_task_control()。
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
taskcontrol* c = get_or_new_task_control();
if (null == c) {
return enomem;
}
if (attr != null && (attr->flags & bthread_nosignal)) {
// remember the taskgroup to insert nosignal tasks for 2 reasons:
// 1. nosignal is often for creating many bthreads in batch,
// inserting into the same taskgroup maximizes the batch.
// 2. bthread_flush() needs to know which taskgroup to flush.
taskgroup* g = tls_task_group_nosignal;
if (null == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
return g->start_background(tid, attr, fn, arg);
}
return c->choose_one_group()->start_background(
tid, attr, fn, arg);
}
然后我们查看这个函数的定义:
get_or_new_task_control()
这个函数一开始先对全局变量tc(g_task_control)初始化原子变量,然后通过原子变量进行load,取出tc指针,如果不为空,直接返回。然后对竞争加上自旋锁,重复上一操作。如果加自旋锁也没有获取到tc指针,则说明tc指针确实为null,所以我们new一个tc指针,并且用并发度concurrency来初始化全局tc,最后蒋全局tc存入原子变量并返回。
inline taskcontrol* get_or_new_task_control() {
butil::atomic* p = (butil::atomic*)&g_task_control;
taskcontrol* c = p->load(butil::memory_order_consume);
if (c != null) {
return c;
}
baidu_scoped_lock(g_task_control_mutex);
c = p->load(butil::memory_order_consume);
if (c != null) {
return c;
}
c = new (std::nothrow) taskcontrol;
if (null == c) {
return null;
}
int concurrency = flags_bthread_min_concurrency > 0 ?
flags_bthread_min_concurrency :
flags_bthread_concurrency;
if (c->init(concurrency) != 0) {
log(error) << "fail to init g_task_control";
delete c;
return null;
}
p->store(c, butil::memory_order_release);
return c;
}
现在有同学就有疑问了,并发度指的是什么?我们先看taskcontrol::init()的源码。
taskcontrol::init
我们抛开不重要的东西,我们只看到for循环那里,我们可以看到tc的初始化,就是调用了pthread_create()去创建了concurrency个线程,而concurrency就是我们之前提到的并发度,而我之前提到的线程:协程 = n:m在这里就开始展露头角了。
int taskcontrol::init(int concurrency) {
if (_concurrency != 0) {
log(error) << "already initialized";
return -1;
}
if (concurrency <= 0) {
log(error) << "invalid concurrency=" << concurrency;
return -1;
}
_concurrency = concurrency;
// make sure timerthread is ready.
if (get_or_create_global_timer_thread() == null) {
log(error) << "fail to get global_timer_thread";
return -1;
}
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; i) {
const int rc = pthread_create(&_workers[i], null, worker_thread, this);
if (rc) {
log(error) << "fail to create _workers[" << i << "], " << berror(rc);
return -1;
}
}
_worker_usage_second.expose("bthread_worker_usage");
_switch_per_second.expose("bthread_switch_second");
_signal_per_second.expose("bthread_signal_second");
_status.expose("bthread_group_status");
// wait for at least one group is added so that choose_one_group()
// never returns null.
// todo: handle the case that worker quits before add_group
while (_ngroup == 0) {
usleep(100); // todo: elaborate
}
return 0;
}
在这里我们可以看到每个线程有一个回调函数worker_thread,下面我们看到这个回调函数的源码。
taskcontrol::worker_thread
首先我们先获取一个tc的指针,然后用create_group创建一个tg,然后初始化tls_task_group,并且把_nworkers加一,然后运行主任务run_main_task,注意了,这里是一个死循环,如果推出了一些死循环,就输出一些日志,并且进行清理工作。
void* taskcontrol::worker_thread(void* arg) {
run_worker_startfn();
#ifdef baidu_internal
logging::comloginitializer comlog_initializer;
#endif
taskcontrol* c = static_cast(arg);
taskgroup* g = c->create_group();
taskstatistics stat;
if (null == g) {
log(error) << "fail to create taskgroup in pthread=" << pthread_self();
return null;
}
bt_vlog << "created worker=" << pthread_self()
<< " bthread=" << g->main_tid();
tls_task_group = g;
c->_nworkers << 1;
g->run_main_task();
stat = g->main_stat();
bt_vlog << "destroying worker=" << pthread_self() << " bthread="
<< g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
<< "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
tls_task_group = null;
g->destroy_self();
c->_nworkers << -1;
return null;
}
好的,这里我们先对create_group和run_main_task的源码进行分析一下。
taskcontrol::create_group()
这里我们先创建了一个tg,然后用taskgroup::init进行初始化大小。然后用_add_group判断非工作线程,如果是则delete,这里我们先看到taskgroup::init函数。
taskgroup* taskcontrol::create_group() {
taskgroup* g = new (std::nothrow) taskgroup(this);
if (null == g) {
log(fatal) << "fail to new taskgroup";
return null;
}
if (g->init(flags_task_group_runqueue_capacity) != 0) {
log(error) << "fail to init taskgroup";
delete g;
return null;
}
if (_add_group(g) != 0) {
delete g;
return null;
}
return g;
}
taskgroup::init
这里我们可以看到这个函数用runqueue_capacity来控制这rq和remote_rq队列的大小。而这个数值的默认值为4096,然后进行get_stack()操作。进行完get_stack()操作之后,然后建立资源池slot,然后从资源池中获取tm。
int taskgroup::init(size_t runqueue_capacity) {
if (_rq.init(runqueue_capacity) != 0) {
log(fatal) << "fail to init _rq";
return -1;
}
if (_remote_rq.init(runqueue_capacity / 2) != 0) {
log(fatal) << "fail to init _remote_rq";
return -1;
}
contextualstack* stk = get_stack(stack_type_main, null);
if (null == stk) {
log(fatal) << "fail to get main stack container";
return -1;
}
butil::resourceid slot;
taskmeta* m = butil::get_resource(&slot);
if (null == m) {
log(fatal) << "fail to get taskmeta";
return -1;
}
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = null;
m->arg = null;
m->local_storage = local_storage_init;
m->cpuwide_start_ns = butil::cpuwide_time_ns();
m->stat = empty_stat;
m->attr = bthread_attr_taskgroup;
m->tid = make_tid(*m->version_butex, slot);
m->set_stack(stk);
_cur_meta = m;
_main_tid = m->tid;
_main_stack = stk;
_last_run_ns = butil::cpuwide_time_ns();
return 0;
}
get_stack
这是get_stack()的源码,这个函数里面只有一个switch操作,有四个栈类型选项。分别是:
栈名 | 栈大小 |
---|---|
stack_type_small | 32k(32768) |
stack_type_normal | 1m(1048576) |
stack_type_large | 8m(8388608) |
还有一个是stack_type_main
前三个都归为一类,是stackfactory的通用模板,而最后一个另起一类,是其的特例化模板。
inline contextualstack* get_stack(stacktype type, void (*entry)(intptr_t)) {
switch (type) {
case stack_type_pthread:
return null;
case stack_type_small:
return stackfactory::get_stack(entry);
case stack_type_normal:
return stackfactory::get_stack(entry);
case stack_type_large:
return stackfactory::get_stack(entry);
case stack_type_main:
return stackfactory::get_stack(entry);
}
return null;
}
首先我们看到通用模板的定义:
template struct stackfactory {
struct wrapper : public contextualstack {
explicit wrapper(void (*entry)(intptr_t)) {
if (allocate_stack_storage(&storage, *stackclass::stack_size_flag,
flags_guard_page_size) != 0) {
storage.zeroize();
context = null;
return;
}
context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
stacktype = (stacktype)stackclass::stacktype;
}
~wrapper() {
if (context) {
context = null;
deallocate_stack_storage(&storage);
storage.zeroize();
}
}
};
static contextualstack* get_stack(void (*entry)(intptr_t)) {
return butil::get_object(entry);
}
static void return_stack(contextualstack* sc) {
butil::return_object(static_cast(sc));
}
};
这是stackfactory的通用模板,我们可以发现两个函数和一个内部类wapper。这两个函数一个是获取栈,一个是归还栈,这两个操作是互逆的。这个内部类的bthread_make_fcontext函数会对其三个变量进行初始化,而这三个变量都是contextualstack的成员。
struct contextualstack {
bthread_fcontext_t context;
stacktype stacktype;
stackstorage storage;
};
说完了通用模板,我们来说一下特例化模板。
template <> struct stackfactory {
static contextualstack* get_stack(void (*)(intptr_t)) {
contextualstack* s = new (std::nothrow) contextualstack;
if (null == s) {
return null;
}
s->context = null;
s->stacktype = stack_type_main;
s->storage.zeroize();
return s;
}
static void return_stack(contextualstack* s) {
delete s;
}
};
特例化模板的代码量比通用模板的代码量要少,那么说明特例化模板的思路更加简单啦。因为特例化没有wapper,也就没有调用那个分配上下文的函数bthread_make_fcontext()。
taskgroup::run_main_task()
void taskgroup::run_main_task() {
bvar::passivestatus cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr > > usage_bvar;
taskgroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
taskgroup::sched_to(&dummy, tid);
dcheck_eq(this, dummy);
dcheck_eq(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
taskgroup::task_runner(1/*skip remained*/);
}
if (flags_show_per_worker_usage_in_vars && !usage_bvar) {
char name[32];
#if defined(os_macosx)
snprintf(name, sizeof(name), "bthread_worker_usage_%" priu64,
pthread_numeric_id());
#else
snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
(long)syscall(sys_gettid));
#endif
usage_bvar.reset(new bvar::persecond >
(name, &cumulated_cputime, 1));
}
}
// don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns = butil::cpuwide_time_ns() - _last_run_ns;
}
run_main_task的三大关键函数。
- wait_task(等)
wait_task就是等待找到任务,而这里就会涉及到工作窃取。下面我们先去看一下wait_task的源代码,看看究竟是怎么做到工作窃取的。
run_main_task
bool taskgroup::wait_task(bthread_t* tid) {
do {
#ifndef bthread_dont_save_parking_state
if (_last_pl_state.stopped()) {
return false;
}
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
}
} while (true);
}
这里我们先看到_last_pl_state和stopped()两个新颖的东西,我们先看到一下stopped的源码。
class baidu_cacheline_alignment parkinglot {
public:
class state {
public:
state(): val(0) {
}
bool stopped() const {
return val & 1; }
private:
friend class parkinglot;
state(int val) : val(val) {
}
int val;
};
parkinglot() : _pending_signal(0) {
}
// wake up at most `num_task' workers.
// returns #workers woken up.
int signal(int num_task) {
_pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
return futex_wake_private(&_pending_signal, num_task);
}
// get a state for later wait().
state get_state() {
return _pending_signal.load(butil::memory_order_acquire);
}
// wait for tasks.
// if the `expected_state' does not match, wait() may finish directly.
void wait(const state& expected_state) {
futex_wait_private(&_pending_signal, expected_state.val, null);
}
// wakeup suspended wait() and make them unwaitable ever.
void stop() {
_pending_signal.fetch_or(1);
futex_wake_private(&_pending_signal, 10000);
}
private:
// higher 31 bits for signalling, lsb for stopping.
butil::atomic _pending_signal;
};
好的,看stopped的源码直接给我揪出了一个类,那我们就不避免的去了解一下这个类了。这个类里面有一个内部类state,parkinglot是它的友元。我们先看一下它的成员函数signal,唤醒最多num_task<<1个worker,这个位操作世纪就是把num_task乘了2。接着调用了futex_wake_private。而这个函数的源码为:
inline int futex_wait_private(
void* addr1, int expected, const timespec* timeout) {
return syscall(sys_futex, addr1, (futex_wait | futex_private_flag),
expected, timeout, null, 0);
}
这个函数只有一条语句,就是return语句。之所以要这样做,其实就是对于系统调用sys_futex的封装。
然后我们回到wait_task函数,wait函数就是阻塞等待通知,被通知后执行steal_task()函数。
steal_task
首先是_remote_rq队列中的任务出队,如果没有则全局tc来窃取任务。
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
#ifndef bthread_dont_save_parking_state
_last_pl_state = _pl->get_state();
#endif
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
调用到的taskcontrol::steal_task()源码如下:
taskcontrol::steal_task
先随机找一个tg,先从它的rq队列窃取任务,如果失败则从remote_rq队列窃取任务。所以说,rq比remote_rq的优先级更高。这里疑问就来了,为啥是这么个顺序?这里是为了避免资源竞态,避免多个tg等待任务的时候,当前tg从rq中取任务,与其他tg过来自己这边窃取任务造成竞态。
bool taskcontrol::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 1: acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}
// note: don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; i, s = offset) {
taskgroup* g = _groups[s % ngroup];
// g is possibly null because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}
taskgroup::sched_to
- sched_to(切换栈)
首先先通过传入的参数next_tid找到tm,next_meta,和对应的contextualstack信息:stk。然后给next_meta设置栈stk。然后调用重载的sched_to函数。
inline void taskgroup::sched_to(taskgroup** pg, bthread_t next_tid) {
taskmeta* next_meta = address_meta(next_tid);
if (next_meta->stack == null) {
contextualstack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is bthread_stacktype_pthread or out of memory,
// in latter case, attr is forced to be bthread_stacktype_pthread.
// this basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = bthread_stacktype_pthread;
next_meta->set_stack((*pg)->_main_stack);
}
}
// update now_ns only when wait_task did yield.
sched_to(pg, next_meta);
}
taskgroup::sched_to
显示记录一些数据,判断下一个tm和当前tm是否相等,如果不相等,则去切换栈。tls_bls是当前tm的局部存储,先做还原,然后赋值成下一个tm的局部存储,然后jump_stack()去切换栈。至于jump_stack()涉及到汇编知识,而我不太擅长,就不展开讲了。
void taskgroup::sched_to(taskgroup** pg, taskmeta* next_meta) {
taskgroup* g = *pg;
#ifndef ndebug
if (( g->_sched_recursive_guard) > 1) {
log(fatal) << "recursively(" << g->_sched_recursive_guard - 1
<< ") call sched_to(" << g << ")";
}
#endif
// save errno so that errno is bthread-specific.
const int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;
taskmeta* const cur_meta = g->_cur_meta;
const int64_t now = butil::cpuwide_time_ns();
const int64_t elp_ns = now - g->_last_run_ns;
g->_last_run_ns = now;
cur_meta->stat.cputime_ns = elp_ns;
if (cur_meta->tid != g->main_tid()) {
g->_cumulated_cputime_ns = elp_ns;
}
cur_meta->stat.nswitch;
g->_nswitch;
// switch to the task
if (__builtin_expect(next_meta != cur_meta, 1)) {
g->_cur_meta = next_meta;
// switch tls_bls
cur_meta->local_storage = tls_bls;
tls_bls = next_meta->local_storage;
// logging must be done after switching the local storage, since the logging lib
// use bthread local storage internally, or will cause memory leak.
if ((cur_meta->attr.flags & bthread_log_context_switch) ||
(next_meta->attr.flags & bthread_log_context_switch)) {
log(info) << "switch bthread: " << cur_meta->tid << " -> "
<< next_meta->tid;
}
if (cur_meta->stack != null) {
if (next_meta->stack != cur_meta->stack) {
jump_stack(cur_meta->stack, next_meta->stack);
// probably went to another group, need to assign g again.
g = tls_task_group;
}
#ifndef ndebug
else {
// else pthread_task is switching to another pthread_task, sc
// can only equal when they're both _main_stack
check(cur_meta->stack == g->_main_stack);
}
#endif
}
// else because of ending_sched(including pthread_task->pthread_task)
} else {
log(fatal) << "bthread=" << g->current_tid() << " sched_to itself!";
}
while (g->_last_context_remained) {
remainedfn fn = g->_last_context_remained;
g->_last_context_remained = null;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
// restore errno
errno = saved_errno;
tls_unique_user_ptr = saved_unique_user_ptr;
#ifndef ndebug
--g->_sched_recursive_guard;
#endif
*pg = g;
}
- task_runner(执行)
下面我们来看一下task_runner的源码
taskgroup::task_runner
源码很长,由于task_runner的输入参数为1,则最上面的if逻辑语句会跳过。然后下面就是一个do-while循环,先执行回调函数,然后清理线程局部变量,累加版本号,唤醒join,_nbthreads减一。然后调用ending_sched()查找下一个任务。当tls_task_group ==main_tid时终止循环。
void taskgroup::task_runner(intptr_t skip_remained) {
// note: tls_task_group is volatile since tasks are moved around
// different groups.
taskgroup* g = tls_task_group;
if (!skip_remained) {
while (g->_last_context_remained) {
remainedfn fn = g->_last_context_remained;
g->_last_context_remained = null;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
#ifndef ndebug
--g->_sched_recursive_guard;
#endif
}
do {
// a task can be stopped before it gets running, in which case
// we may skip user function, but that may confuse user:
// most tasks have variables to remember running result of the task,
// which is often initialized to values indicating success. if an
// user function is never called, the variables will be unchanged
// however they'd better reflect failures because the task is stopped
// abnormally.
// meta and identifier of the task is persistent in this run.
taskmeta* const m = g->_cur_meta;
if (flags_show_bthread_creation_in_vars) {
// note: the thread triggering exposure of pending time may spend
// considerable time because a single bvar::latencyrecorder
// contains many bvar.
g->_control->exposed_pending_time() <<
(butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000l;
}
// not catch exceptions except exitexception which is for implementing
// bthread_exit(). user code is intended to crash when an exception is
// not caught explicitly. this is consistent with other threading
// libraries.
void* thread_return;
try {
thread_return = m->fn(m->arg);
} catch (exitexception& e) {
thread_return = e.value();
}
// group is probably changed
g = tls_task_group;
// todo: save thread_return
(void)thread_return;
// logging must be done before returning the keytable, since the logging lib
// use bthread local storage internally, or will cause memory leak.
// fixme: the time from quiting fn to here is not counted into cputime
if (m->attr.flags & bthread_log_start_and_finish) {
log(info) << "finished bthread " << m->tid << ", cputime="
<< m->stat.cputime_ns / 1000000.0 << "ms";
}
// clean tls variables, must be done before changing version_butex
// otherwise another thread just joined this thread may not see side
// effects of destructing tls variables.
keytable* kt = tls_bls.keytable;
if (kt != null) {
return_keytable(m->attr.keytable_pool, kt);
// after deletion: tls may be set during deletion.
tls_bls.keytable = null;
m->local_storage.keytable = null; // optional
}
// increase the version and wake up all joiners, if resulting version
// is 0, change it to 1 to make bthread_t never be 0. any access
// or join to the bthread after changing version will be rejected.
// the spinlock is for visibility of taskgroup::get_attr.
{
baidu_scoped_lock(m->version_lock);
if (0 == *m->version_butex) {
*m->version_butex;
}
}
butex_wake_except(m->version_butex, 0);
g->_control->_nbthreads << -1;
g->set_remained(taskgroup::_release_last_context, m);
ending_sched(&g);
} while (g->_cur_meta->tid != g->_main_tid);
// was called from a pthread and we don't have bthread_stacktype_pthread
// tasks to run, quit for more tasks.
}
好的,现在我们讲解完了get_or_new_task_control()函数,我们继续往上一级看。然后会调用start_background< true >函数。由于在bthread_start_background中if(g)也使用了start_background函数,只是后面是true和false的差别。我们一起来看一看这个函数的源码。
taskgroup::start_background
这个函数的前文就是对一些变量的初始化。而前面的true和false的判断就体现在最后的if语句中。如果是真,则运行的是ready_to_run_remote()
template
int taskgroup::start_background(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
if (__builtin_expect(!fn, 0)) {
return einval;
}
const int64_t start_ns = butil::cpuwide_time_ns();
const bthread_attr_t using_attr = (attr ? *attr : bthread_attr_normal);
butil::resourceid slot;
taskmeta* m = butil::get_resource(&slot);
if (__builtin_expect(!m, 0)) {
return enomem;
}
check(m->current_waiter.load(butil::memory_order_relaxed) == null);
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = fn;
m->arg = arg;
check(m->stack == null);
m->attr = using_attr;
m->local_storage = local_storage_init;
m->cpuwide_start_ns = start_ns;
m->stat = empty_stat;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
if (using_attr.flags & bthread_log_start_and_finish) {
log(info) << "started bthread " << m->tid;
}
_control->_nbthreads << 1;
if (remote) {
ready_to_run_remote(m->tid, (using_attr.flags & bthread_nosignal));
} else {
ready_to_run(m->tid, (using_attr.flags & bthread_nosignal));
}
return 0;
}
taskgroup::ready_to_run_remote
先给当前tg的remote_rq加互斥锁。然后入队,然后一个while循环,入队失败则休眠1ms,再继续入队。入队失败的原因只有一个,那就是队列容量已满。nosigner一般多为false,所以选择到一个else这里,最后调用了一个signal_task。
void taskgroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
_remote_rq._mutex.lock();
while (!_remote_rq.push_locked(tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
log_every_second(error) << "_remote_rq is full, capacity="
<< _remote_rq.capacity();
::usleep(1000);
_remote_rq._mutex.lock();
}
if (nosignal) {
_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_nsignaled = 1 additional_signal;
_remote_rq._mutex.unlock();
_control->signal_task(1 additional_signal);
}
}
taskcontrol::signal_task
num_task如果大于2,则重新赋值为2。因为如果num_task越大,消费得也就越快,但是如果一直消费,函数调用是阻塞的,这样bthread_start_background就不好生产任务,在这里也是达到了一种平衡。
void taskcontrol::signal_task(int num_task) {
if (num_task <= 0) {
return;
}
// todo(gejun): current algorithm does not guarantee enough threads will
// be created to match caller's requests. but in another side, there's also
// many useless signalings according to current impl. capping the concurrency
// is a good balance between performance and timeliness of scheduling.
if (num_task > 2) {
num_task = 2;
}
int start_index = butil::fmix64(pthread_numeric_id()) % parking_lot_num;
num_task -= _pl[start_index].signal(1);
if (num_task > 0) {
for (int i = 1; i < parking_lot_num && num_task > 0; i) {
if ( start_index >= parking_lot_num) {
start_index = 0;
}
num_task -= _pl[start_index].signal(1);
}
}
if (num_task > 0 &&
flags_bthread_min_concurrency > 0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) < flags_bthread_concurrency) {
// todo: reduce this lock
baidu_scoped_lock(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) < flags_bthread_concurrency) {
add_workers(1);
}
}
}
而当为false时,则调用ready_ro_run。
taskgroup::ready_to_run
而ready_to_run相较于ready_to_remote对比就简洁很多了。而这两个的区别就是ready_to_run()就是把任务入队到tg的 rq,ready_to_run_remote()是在当前线程不是brpc的worker()的时候(在worker外创建的 bthread任务),把任务通过tc入队到某个tg的 remote_rq。
void taskgroup::ready_to_run(bthread_t tid, bool nosignal) {
push_rq(tid);
if (nosignal) {
_num_nosignal;
} else {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled = 1 additional_signal;
_control->signal_task(1 additional_signal);
}
}
说了这么多了,回归到最初的两个新建函数,还一个新建函数是bthread_start_urgent。
bthread_start_urgent
大致是相同的,但我们会发现在存在tg的情况下,调用的函数会稍微有些区别。这里调用的是taskgroup::start_foreground。
int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::taskgroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return bthread::taskgroup::start_foreground(&g, tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
taskgroup::start_foreground
我们可以通过源码观察到,其实这个新建bthread函数就是把运行一起承包了,当然,耗时也会有差别。我们在测试运行上可以看出来差距。
int taskgroup::start_foreground(taskgroup** pg,
bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
if (__builtin_expect(!fn, 0)) {
return einval;
}
const int64_t start_ns = butil::cpuwide_time_ns();
const bthread_attr_t using_attr = (attr ? *attr : bthread_attr_normal);
butil::resourceid slot;
taskmeta* m = butil::get_resource(&slot);
if (__builtin_expect(!m, 0)) {
return enomem;
}
check(m->current_waiter.load(butil::memory_order_relaxed) == null);
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = fn;
m->arg = arg;
check(m->stack == null);
m->attr = using_attr;
m->local_storage = local_storage_init;
m->cpuwide_start_ns = start_ns;
m->stat = empty_stat;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
if (using_attr.flags & bthread_log_start_and_finish) {
log(info) << "started bthread " << m->tid;
}
taskgroup* g = *pg;
g->_control->_nbthreads << 1;
if (g->is_current_pthread_task()) {
// never create foreground task in pthread.
g->ready_to_run(m->tid, (using_attr.flags & bthread_nosignal));
} else {
// nosignal affects current task, not the new task.
remainedfn fn = null;
if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
readytorunargs args = {
g->current_tid(),
(bool)(using_attr.flags & bthread_nosignal)
};
g->set_remained(fn, &args);
taskgroup::sched_to(pg, m->tid);
}
return 0;
}
在这里我们进行的测试是测试利用bthread_start_urgent和bthread_start_background的创建耗时对比。创建10000个样例然后进行除法测出两种方法创建bthread所需时间。
test_f(bthreadtest, start_latency_when_high_idle) {
bool warmup = true;
long elp1 = 0;
long elp2 = 0;
int rep = 0;
for (int i = 0; i < 10000; i) {
butil::timer tm;
tm.start();
bthread_t th;
bthread_start_urgent(&th, null, log_start_latency, &tm);
bthread_join(th, null);
bthread_t th2;
butil::timer tm2;
tm2.start();
bthread_start_background(&th2, null, log_start_latency, &tm2);
bthread_join(th2, null);
if (!warmup) {
rep;
elp1 = tm.n_elapsed();
elp2 = tm2.n_elapsed();
} else if (i == 100) {
warmup = false;
}
}
log(info) << "start_urgent=" << elp1 / rep << "ns start_background="
<< elp2 / rep << "ns";
}
这是一段测试代码,我们通过make然后脚本运行后观察数据。
发现bthread_start_background要比bthread_start_urgent耗时虽说差距不大,但是bthread_start_background还时要快一点。
以上就是我对bthread的理解了,有问题欢迎指出哦!