菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于parkinglot的bthread调度brpc源码解析(八)—— 基础类eventdispatcher详解brpc源码解析(九)—— 基础类workstealingqueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— reducer类和adder类解析brpc源码解析(十二)—— 核心组件bvar详解 agentgroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— mpsc队列executionqueue详解brpc源码解析(十九)—— 双buffer数据结构doublybuffereddata详解brpc源码解析(二十)—— 用于访问下游的channel类详解

brpc的精华bthread源码剖析-ag真人游戏

阅读 : 449

首先我们先明确几个概念。何谓brpc,bthread,协程

  • brpc
    百度内最常使用的工业级rpc框架,rpc把网络交互类比为“client端访问server端上的函数”:client向server发送request后开始等待,知道server收到请求后,处理、回复client,client又再度恢复并根据response做出反应。
  • bthread
    bthread是brpc开发的一套协程,可以通俗的理解为轻量级的线程
  • 协程
    协程是单线程下的并发,又称微线程,纤程。它是实现多任务的另一种方式,只不过是比线程更小的执行单元。因为它自带cpu的上下文,这样只要在合适的时机,我们可以把一个协程切换到另一个协程。

正如标题所说,brpc的精华全部都在bthread上,而bthread就是我们brpc开发的一套“协程”。而进程,线程,和bthread的关系是什么样的呢?一个进程里面可以开辟多个线程,而线程和协程的关系呢。在微信开源的libco上,线程 :协程 = 1 :n。而在bthread上 线程 :协程 = m :n,而bthread实现的关键就是工作窃取算法。后续会展开描述。
bthread有三大件:

  • taskcontrol(进程内唯一)
  • taskgroup(线程内唯一)
  • taskmeta(bthread上下文)

由于函数量巨大,所以我绘制了一张函数总体思路总结表,供大家方便理解。

我们从创建函数开始讲起,并且以展开的方式依次对一些我能理解的地方进行剖析。首先我们介绍的是bthread创建函数。

bthread创建函数

好的 下面我一一讲解这两个bthread创建函数。

首先我们会去尝试获取到bthread的tls_task_group,而我们查看tls_task_group的定义处可以看到
__thread taskgroup* tls_task_group = null; 那就证明如果没有tls_task_group的话则需要执行到return中的语句start_from_non_worker(),接下来我们查看到这个函数的定义。

//其一
int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),
                             void* __restrict arg) {
  
    bthread::taskgroup* g = bthread::tls_task_group;
    if (g) {
  
        // start from worker
        return g->start_background(tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}
//其二
int bthread_start_urgent(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg) {
  
    bthread::taskgroup* g = bthread::tls_task_group;
    if (g) {
  
        // start from worker
        return bthread::taskgroup::start_foreground(&g, tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

start_from_non_worker

这个函数一开始,我们会进行对taskcontrol的获取或者新建taskcontrol,也就是函数get_or_new_task_control()。

start_from_non_worker(bthread_t* __restrict tid,
                      const bthread_attr_t* __restrict attr,
                      void * (*fn)(void*),
                      void* __restrict arg) {
  
    taskcontrol* c = get_or_new_task_control();
    if (null == c) {
  
        return enomem;
    }
    if (attr != null && (attr->flags & bthread_nosignal)) {
  
        // remember the taskgroup to insert nosignal tasks for 2 reasons:
        // 1. nosignal is often for creating many bthreads in batch,
        //    inserting into the same taskgroup maximizes the batch.
        // 2. bthread_flush() needs to know which taskgroup to flush.
        taskgroup* g = tls_task_group_nosignal;
        if (null == g) {
  
            g = c->choose_one_group();
            tls_task_group_nosignal = g;
        }
        return g->start_background(tid, attr, fn, arg);
    }
    return c->choose_one_group()->start_background(
        tid, attr, fn, arg);
}

然后我们查看这个函数的定义:

get_or_new_task_control()

这个函数一开始先对全局变量tc(g_task_control)初始化原子变量,然后通过原子变量进行load,取出tc指针,如果不为空,直接返回。然后对竞争加上自旋锁,重复上一操作。如果加自旋锁也没有获取到tc指针,则说明tc指针确实为null,所以我们new一个tc指针,并且用并发度concurrency来初始化全局tc,最后蒋全局tc存入原子变量并返回。

inline taskcontrol* get_or_new_task_control() {
  
    butil::atomic* p = (butil::atomic*)&g_task_control;
    taskcontrol* c = p->load(butil::memory_order_consume);
    if (c != null) {
  
        return c;
    }
    baidu_scoped_lock(g_task_control_mutex);
    c = p->load(butil::memory_order_consume);
    if (c != null) {
  
        return c;
    }
    c = new (std::nothrow) taskcontrol;
    if (null == c) {
  
        return null;
    }
    int concurrency = flags_bthread_min_concurrency > 0 ?
        flags_bthread_min_concurrency :
        flags_bthread_concurrency;
    if (c->init(concurrency) != 0) {
  
        log(error) << "fail to init g_task_control";
        delete c;
        return null;
    }
    p->store(c, butil::memory_order_release);
    return c;
}

现在有同学就有疑问了,并发度指的是什么?我们先看taskcontrol::init()的源码。

taskcontrol::init

我们抛开不重要的东西,我们只看到for循环那里,我们可以看到tc的初始化,就是调用了pthread_create()去创建了concurrency个线程,而concurrency就是我们之前提到的并发度,而我之前提到的线程:协程 = n:m在这里就开始展露头角了。

int taskcontrol::init(int concurrency) {
  
    if (_concurrency != 0) {
  
        log(error) << "already initialized";
        return -1;
    }
    if (concurrency <= 0) {
  
        log(error) << "invalid concurrency=" << concurrency;
        return -1;
    }
    _concurrency = concurrency;
    // make sure timerthread is ready.
    if (get_or_create_global_timer_thread() == null) {
  
        log(error) << "fail to get global_timer_thread";
        return -1;
    }
    
    _workers.resize(_concurrency);   
    for (int i = 0; i < _concurrency;   i) {
  
        const int rc = pthread_create(&_workers[i], null, worker_thread, this);
        if (rc) {
  
            log(error) << "fail to create _workers[" << i << "], " << berror(rc);
            return -1;
        }
    }
    _worker_usage_second.expose("bthread_worker_usage");
    _switch_per_second.expose("bthread_switch_second");
    _signal_per_second.expose("bthread_signal_second");
    _status.expose("bthread_group_status");
    // wait for at least one group is added so that choose_one_group()
    // never returns null.
    // todo: handle the case that worker quits before add_group
    while (_ngroup == 0) {
  
        usleep(100);  // todo: elaborate
    }
    return 0;
}

在这里我们可以看到每个线程有一个回调函数worker_thread,下面我们看到这个回调函数的源码。

taskcontrol::worker_thread

首先我们先获取一个tc的指针,然后用create_group创建一个tg,然后初始化tls_task_group,并且把_nworkers加一,然后运行主任务run_main_task,注意了,这里是一个死循环,如果推出了一些死循环,就输出一些日志,并且进行清理工作。

void* taskcontrol::worker_thread(void* arg) {
  
    run_worker_startfn();    
#ifdef baidu_internal
    logging::comloginitializer comlog_initializer;
#endif
    
    taskcontrol* c = static_cast(arg);
    taskgroup* g = c->create_group();
    taskstatistics stat;
    if (null == g) {
  
        log(error) << "fail to create taskgroup in pthread=" << pthread_self();
        return null;
    }
    bt_vlog << "created worker=" << pthread_self()
            << " bthread=" << g->main_tid();
    tls_task_group = g;
    c->_nworkers << 1;
    g->run_main_task();
    stat = g->main_stat();
    bt_vlog << "destroying worker=" << pthread_self() << " bthread="
            << g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
            << "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
    tls_task_group = null;
    g->destroy_self();
    c->_nworkers << -1;
    return null;
}

好的,这里我们先对create_group和run_main_task的源码进行分析一下。

taskcontrol::create_group()

这里我们先创建了一个tg,然后用taskgroup::init进行初始化大小。然后用_add_group判断非工作线程,如果是则delete,这里我们先看到taskgroup::init函数。

taskgroup* taskcontrol::create_group() {
  
    taskgroup* g = new (std::nothrow) taskgroup(this);
    if (null == g) {
  
        log(fatal) << "fail to new taskgroup";
        return null;
    }
    if (g->init(flags_task_group_runqueue_capacity) != 0) {
  
        log(error) << "fail to init taskgroup";
        delete g;
        return null;
    }
    if (_add_group(g) != 0) {
  
        delete g;
        return null;
    }
    return g;
}

taskgroup::init

这里我们可以看到这个函数用runqueue_capacity来控制这rq和remote_rq队列的大小。而这个数值的默认值为4096,然后进行get_stack()操作。进行完get_stack()操作之后,然后建立资源池slot,然后从资源池中获取tm。

int taskgroup::init(size_t runqueue_capacity) {
  
    if (_rq.init(runqueue_capacity) != 0) {
  
        log(fatal) << "fail to init _rq";
        return -1;
    }
    if (_remote_rq.init(runqueue_capacity / 2) != 0) {
  
        log(fatal) << "fail to init _remote_rq";
        return -1;
    }
    contextualstack* stk = get_stack(stack_type_main, null);
    if (null == stk) {
  
        log(fatal) << "fail to get main stack container";
        return -1;
    }
    butil::resourceid slot;
    taskmeta* m = butil::get_resource(&slot);
    if (null == m) {
  
        log(fatal) << "fail to get taskmeta";
        return -1;
    }
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = null;
    m->arg = null;
    m->local_storage = local_storage_init;
    m->cpuwide_start_ns = butil::cpuwide_time_ns();
    m->stat = empty_stat;
    m->attr = bthread_attr_taskgroup;
    m->tid = make_tid(*m->version_butex, slot);
    m->set_stack(stk);
    _cur_meta = m;
    _main_tid = m->tid;
    _main_stack = stk;
    _last_run_ns = butil::cpuwide_time_ns();
    return 0;
}

get_stack

这是get_stack()的源码,这个函数里面只有一个switch操作,有四个栈类型选项。分别是:

栈名 栈大小
stack_type_small 32k(32768)
stack_type_normal 1m(1048576)
stack_type_large 8m(8388608)

还有一个是stack_type_main

前三个都归为一类,是stackfactory的通用模板,而最后一个另起一类,是其的特例化模板。

inline contextualstack* get_stack(stacktype type, void (*entry)(intptr_t)) {
  
    switch (type) {
  
    case stack_type_pthread:
        return null;
    case stack_type_small:
        return stackfactory::get_stack(entry);
    case stack_type_normal:
        return stackfactory::get_stack(entry);
    case stack_type_large:
        return stackfactory::get_stack(entry);
    case stack_type_main:
        return stackfactory::get_stack(entry);
    }
    return null;
}

首先我们看到通用模板的定义:

template  struct stackfactory {
  
    struct wrapper : public contextualstack {
  
        explicit wrapper(void (*entry)(intptr_t)) {
  
            if (allocate_stack_storage(&storage, *stackclass::stack_size_flag,
                                       flags_guard_page_size) != 0) {
  
                storage.zeroize();
                context = null;
                return;
            }
            context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
            stacktype = (stacktype)stackclass::stacktype;
        }
        ~wrapper() {
  
            if (context) {
  
                context = null;
                deallocate_stack_storage(&storage);
                storage.zeroize();
            }
        }
    };
    
    static contextualstack* get_stack(void (*entry)(intptr_t)) {
  
        return butil::get_object(entry);
    }
    
    static void return_stack(contextualstack* sc) {
  
        butil::return_object(static_cast(sc));
    }
};

这是stackfactory的通用模板,我们可以发现两个函数和一个内部类wapper。这两个函数一个是获取栈,一个是归还栈,这两个操作是互逆的。这个内部类的bthread_make_fcontext函数会对其三个变量进行初始化,而这三个变量都是contextualstack的成员。

struct contextualstack {
  
    bthread_fcontext_t context;
    stacktype stacktype;
    stackstorage storage;
};

说完了通用模板,我们来说一下特例化模板。

template <> struct stackfactory {
  
    static contextualstack* get_stack(void (*)(intptr_t)) {
  
        contextualstack* s = new (std::nothrow) contextualstack;
        if (null == s) {
  
            return null;
        }
        s->context = null;
        s->stacktype = stack_type_main;
        s->storage.zeroize();
        return s;
    }
    
    static void return_stack(contextualstack* s) {
  
        delete s;
    }
};

特例化模板的代码量比通用模板的代码量要少,那么说明特例化模板的思路更加简单啦。因为特例化没有wapper,也就没有调用那个分配上下文的函数bthread_make_fcontext()。

taskgroup::run_main_task()

void taskgroup::run_main_task() {
  
    bvar::passivestatus cumulated_cputime(
        get_cumulated_cputime_from_this, this);
    std::unique_ptr > > usage_bvar;
    taskgroup* dummy = this;
    bthread_t tid;
    while (wait_task(&tid)) {
  
        taskgroup::sched_to(&dummy, tid);
        dcheck_eq(this, dummy);
        dcheck_eq(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
  
            taskgroup::task_runner(1/*skip remained*/);
        }
        if (flags_show_per_worker_usage_in_vars && !usage_bvar) {
  
            char name[32];
#if defined(os_macosx)
            snprintf(name, sizeof(name), "bthread_worker_usage_%" priu64,
                     pthread_numeric_id());
#else
            snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
                     (long)syscall(sys_gettid));
#endif
            usage_bvar.reset(new bvar::persecond >
                             (name, &cumulated_cputime, 1));
        }
    }
    // don't forget to add elapse of last wait_task.
    current_task()->stat.cputime_ns  = butil::cpuwide_time_ns() - _last_run_ns;
}

run_main_task的三大关键函数。

  • wait_task(等)
    wait_task就是等待找到任务,而这里就会涉及到工作窃取。下面我们先去看一下wait_task的源代码,看看究竟是怎么做到工作窃取的。

run_main_task

bool taskgroup::wait_task(bthread_t* tid) {
  
    do {
  
#ifndef bthread_dont_save_parking_state
        if (_last_pl_state.stopped()) {
  
            return false;
        }
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
  
            return true;
        }
    } while (true);
}

这里我们先看到_last_pl_state和stopped()两个新颖的东西,我们先看到一下stopped的源码。

class baidu_cacheline_alignment parkinglot {
  
public:
    class state {
  
    public:
        state(): val(0) {
  }
        bool stopped() const {
   return val & 1; }
    private:
    friend class parkinglot;
        state(int val) : val(val) {
  }
        int val;
    };
    parkinglot() : _pending_signal(0) {
  }
    // wake up at most `num_task' workers.
    // returns #workers woken up.
    int signal(int num_task) {
  
        _pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
        return futex_wake_private(&_pending_signal, num_task);
    }
    // get a state for later wait().
    state get_state() {
  
        return _pending_signal.load(butil::memory_order_acquire);
    }
    // wait for tasks.
    // if the `expected_state' does not match, wait() may finish directly.
    void wait(const state& expected_state) {
  
        futex_wait_private(&_pending_signal, expected_state.val, null);
    }
    // wakeup suspended wait() and make them unwaitable ever. 
    void stop() {
  
        _pending_signal.fetch_or(1);
        futex_wake_private(&_pending_signal, 10000);
    }
private:
    // higher 31 bits for signalling, lsb for stopping.
    butil::atomic _pending_signal;
};

好的,看stopped的源码直接给我揪出了一个类,那我们就不避免的去了解一下这个类了。这个类里面有一个内部类state,parkinglot是它的友元。我们先看一下它的成员函数signal,唤醒最多num_task<<1个worker,这个位操作世纪就是把num_task乘了2。接着调用了futex_wake_private。而这个函数的源码为:

inline int futex_wait_private(
    void* addr1, int expected, const timespec* timeout) {
  
    return syscall(sys_futex, addr1, (futex_wait | futex_private_flag),
                   expected, timeout, null, 0);
}

这个函数只有一条语句,就是return语句。之所以要这样做,其实就是对于系统调用sys_futex的封装。
然后我们回到wait_task函数,wait函数就是阻塞等待通知,被通知后执行steal_task()函数。

steal_task

首先是_remote_rq队列中的任务出队,如果没有则全局tc来窃取任务。

bool steal_task(bthread_t* tid) {
  
        if (_remote_rq.pop(tid)) {
  
            return true;
        }
#ifndef bthread_dont_save_parking_state
        _last_pl_state = _pl->get_state();
#endif
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }

调用到的taskcontrol::steal_task()源码如下:

taskcontrol::steal_task

先随机找一个tg,先从它的rq队列窃取任务,如果失败则从remote_rq队列窃取任务。所以说,rq比remote_rq的优先级更高。这里疑问就来了,为啥是这么个顺序?这里是为了避免资源竞态,避免多个tg等待任务的时候,当前tg从rq中取任务,与其他tg过来自己这边窃取任务造成竞态。

bool taskcontrol::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
  
    // 1: acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
  
        return false;
    }
    // note: don't return inside `for' iteration since we need to update |seed|
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup;   i, s  = offset) {
  
        taskgroup* g = _groups[s % ngroup];
        // g is possibly null because of concurrent _destroy_group
        if (g) {
  
            if (g->_rq.steal(tid)) {
  
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
  
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}

taskgroup::sched_to

  • sched_to(切换栈)
    首先先通过传入的参数next_tid找到tm,next_meta,和对应的contextualstack信息:stk。然后给next_meta设置栈stk。然后调用重载的sched_to函数。
inline void taskgroup::sched_to(taskgroup** pg, bthread_t next_tid) {
  
    taskmeta* next_meta = address_meta(next_tid);
    if (next_meta->stack == null) {
  
        contextualstack* stk = get_stack(next_meta->stack_type(), task_runner);
        if (stk) {
  
            next_meta->set_stack(stk);
        } else {
  
            // stack_type is bthread_stacktype_pthread or out of memory,
            // in latter case, attr is forced to be bthread_stacktype_pthread.
            // this basically means that if we can't allocate stack, run
            // the task in pthread directly.
            next_meta->attr.stack_type = bthread_stacktype_pthread;
            next_meta->set_stack((*pg)->_main_stack);
        }
    }
    // update now_ns only when wait_task did yield.
    sched_to(pg, next_meta);
}

taskgroup::sched_to

显示记录一些数据,判断下一个tm和当前tm是否相等,如果不相等,则去切换栈。tls_bls是当前tm的局部存储,先做还原,然后赋值成下一个tm的局部存储,然后jump_stack()去切换栈。至于jump_stack()涉及到汇编知识,而我不太擅长,就不展开讲了。

void taskgroup::sched_to(taskgroup** pg, taskmeta* next_meta) {
  
    taskgroup* g = *pg;
#ifndef ndebug
    if ((  g->_sched_recursive_guard) > 1) {
  
        log(fatal) << "recursively(" << g->_sched_recursive_guard - 1
                   << ") call sched_to(" << g << ")";
    }
#endif
    // save errno so that errno is bthread-specific.
    const int saved_errno = errno;
    void* saved_unique_user_ptr = tls_unique_user_ptr;
    taskmeta* const cur_meta = g->_cur_meta;
    const int64_t now = butil::cpuwide_time_ns();
    const int64_t elp_ns = now - g->_last_run_ns;
    g->_last_run_ns = now;
    cur_meta->stat.cputime_ns  = elp_ns;
    if (cur_meta->tid != g->main_tid()) {
  
        g->_cumulated_cputime_ns  = elp_ns;
    }
      cur_meta->stat.nswitch;
       g->_nswitch;
    // switch to the task
    if (__builtin_expect(next_meta != cur_meta, 1)) {
  
        g->_cur_meta = next_meta;
        // switch tls_bls
        cur_meta->local_storage = tls_bls;
        tls_bls = next_meta->local_storage;
        // logging must be done after switching the local storage, since the logging lib
        // use bthread local storage internally, or will cause memory leak.
        if ((cur_meta->attr.flags & bthread_log_context_switch) ||
            (next_meta->attr.flags & bthread_log_context_switch)) {
  
            log(info) << "switch bthread: " << cur_meta->tid << " -> "
                      << next_meta->tid;
        }
        if (cur_meta->stack != null) {
  
            if (next_meta->stack != cur_meta->stack) {
  
                jump_stack(cur_meta->stack, next_meta->stack);
                // probably went to another group, need to assign g again.
                g = tls_task_group;
            }
#ifndef ndebug
            else {
  
                // else pthread_task is switching to another pthread_task, sc
                // can only equal when they're both _main_stack
                check(cur_meta->stack == g->_main_stack);
            }
#endif
        }
        // else because of ending_sched(including pthread_task->pthread_task)
    } else {
  
        log(fatal) << "bthread=" << g->current_tid() << " sched_to itself!";
    }
    while (g->_last_context_remained) {
  
        remainedfn fn = g->_last_context_remained;
        g->_last_context_remained = null;
        fn(g->_last_context_remained_arg);
        g = tls_task_group;
    }
    // restore errno
    errno = saved_errno;
    tls_unique_user_ptr = saved_unique_user_ptr;
#ifndef ndebug
    --g->_sched_recursive_guard;
#endif
    *pg = g;
}
  • task_runner(执行)
    下面我们来看一下task_runner的源码

taskgroup::task_runner

源码很长,由于task_runner的输入参数为1,则最上面的if逻辑语句会跳过。然后下面就是一个do-while循环,先执行回调函数,然后清理线程局部变量,累加版本号,唤醒join,_nbthreads减一。然后调用ending_sched()查找下一个任务。当tls_task_group ==main_tid时终止循环。

void taskgroup::task_runner(intptr_t skip_remained) {
  
    // note: tls_task_group is volatile since tasks are moved around
    //       different groups.
    taskgroup* g = tls_task_group;
    if (!skip_remained) {
  
        while (g->_last_context_remained) {
  
            remainedfn fn = g->_last_context_remained;
            g->_last_context_remained = null;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }
#ifndef ndebug
        --g->_sched_recursive_guard;
#endif
    }
    do {
  
        // a task can be stopped before it gets running, in which case
        // we may skip user function, but that may confuse user:
        // most tasks have variables to remember running result of the task,
        // which is often initialized to values indicating success. if an
        // user function is never called, the variables will be unchanged
        // however they'd better reflect failures because the task is stopped
        // abnormally.
        // meta and identifier of the task is persistent in this run.
        taskmeta* const m = g->_cur_meta;
        if (flags_show_bthread_creation_in_vars) {
  
            // note: the thread triggering exposure of pending time may spend
            // considerable time because a single bvar::latencyrecorder
            // contains many bvar.
            g->_control->exposed_pending_time() <<
                (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000l;
        }
        // not catch exceptions except exitexception which is for implementing
        // bthread_exit(). user code is intended to crash when an exception is
        // not caught explicitly. this is consistent with other threading
        // libraries.
        void* thread_return;
        try {
  
            thread_return = m->fn(m->arg);
        } catch (exitexception& e) {
  
            thread_return = e.value();
        }
        // group is probably changed
        g = tls_task_group;
        // todo: save thread_return
        (void)thread_return;
        // logging must be done before returning the keytable, since the logging lib
        // use bthread local storage internally, or will cause memory leak.
        // fixme: the time from quiting fn to here is not counted into cputime
        if (m->attr.flags & bthread_log_start_and_finish) {
  
            log(info) << "finished bthread " << m->tid << ", cputime="
                      << m->stat.cputime_ns / 1000000.0 << "ms";
        }
        // clean tls variables, must be done before changing version_butex
        // otherwise another thread just joined this thread may not see side
        // effects of destructing tls variables.
        keytable* kt = tls_bls.keytable;
        if (kt != null) {
  
            return_keytable(m->attr.keytable_pool, kt);
            // after deletion: tls may be set during deletion.
            tls_bls.keytable = null;
            m->local_storage.keytable = null; // optional
        }
        // increase the version and wake up all joiners, if resulting version
        // is 0, change it to 1 to make bthread_t never be 0. any access
        // or join to the bthread after changing version will be rejected.
        // the spinlock is for visibility of taskgroup::get_attr.
        {
  
            baidu_scoped_lock(m->version_lock);
            if (0 ==   *m->version_butex) {
  
                  *m->version_butex;
            }
        }
        butex_wake_except(m->version_butex, 0);
        g->_control->_nbthreads << -1;
        g->set_remained(taskgroup::_release_last_context, m);
        ending_sched(&g);
    } while (g->_cur_meta->tid != g->_main_tid);
    // was called from a pthread and we don't have bthread_stacktype_pthread
    // tasks to run, quit for more tasks.
}

好的,现在我们讲解完了get_or_new_task_control()函数,我们继续往上一级看。然后会调用start_background< true >函数。由于在bthread_start_background中if(g)也使用了start_background函数,只是后面是true和false的差别。我们一起来看一看这个函数的源码。

taskgroup::start_background

这个函数的前文就是对一些变量的初始化。而前面的true和false的判断就体现在最后的if语句中。如果是真,则运行的是ready_to_run_remote()

template 
int taskgroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
  
    if (__builtin_expect(!fn, 0)) {
  
        return einval;
    }
    const int64_t start_ns = butil::cpuwide_time_ns();
    const bthread_attr_t using_attr = (attr ? *attr : bthread_attr_normal);
    butil::resourceid slot;
    taskmeta* m = butil::get_resource(&slot);
    if (__builtin_expect(!m, 0)) {
  
        return enomem;
    }
    check(m->current_waiter.load(butil::memory_order_relaxed) == null);
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
    check(m->stack == null);
    m->attr = using_attr;
    m->local_storage = local_storage_init;
    m->cpuwide_start_ns = start_ns;
    m->stat = empty_stat;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & bthread_log_start_and_finish) {
  
        log(info) << "started bthread " << m->tid;
    }
    _control->_nbthreads << 1;
    if (remote) {
  
        ready_to_run_remote(m->tid, (using_attr.flags & bthread_nosignal));
    } else {
  
        ready_to_run(m->tid, (using_attr.flags & bthread_nosignal));
    }
    return 0;
}

taskgroup::ready_to_run_remote

先给当前tg的remote_rq加互斥锁。然后入队,然后一个while循环,入队失败则休眠1ms,再继续入队。入队失败的原因只有一个,那就是队列容量已满。nosigner一般多为false,所以选择到一个else这里,最后调用了一个signal_task。

void taskgroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
  
    _remote_rq._mutex.lock();
    while (!_remote_rq.push_locked(tid)) {
  
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
        log_every_second(error) << "_remote_rq is full, capacity="
                                << _remote_rq.capacity();
        ::usleep(1000);
        _remote_rq._mutex.lock();
    }
    if (nosignal) {
  
          _remote_num_nosignal;
        _remote_rq._mutex.unlock();
    } else {
  
        const int additional_signal = _remote_num_nosignal;
        _remote_num_nosignal = 0;
        _remote_nsignaled  = 1   additional_signal;
        _remote_rq._mutex.unlock();
        _control->signal_task(1   additional_signal);
    }
}

taskcontrol::signal_task

num_task如果大于2,则重新赋值为2。因为如果num_task越大,消费得也就越快,但是如果一直消费,函数调用是阻塞的,这样bthread_start_background就不好生产任务,在这里也是达到了一种平衡。

void taskcontrol::signal_task(int num_task) {
  
    if (num_task <= 0) {
  
        return;
    }
    // todo(gejun): current algorithm does not guarantee enough threads will
    // be created to match caller's requests. but in another side, there's also
    // many useless signalings according to current impl. capping the concurrency
    // is a good balance between performance and timeliness of scheduling.
    if (num_task > 2) {
  
        num_task = 2;
    }
    int start_index = butil::fmix64(pthread_numeric_id()) % parking_lot_num;
    num_task -= _pl[start_index].signal(1);
    if (num_task > 0) {
  
        for (int i = 1; i < parking_lot_num && num_task > 0;   i) {
  
            if (  start_index >= parking_lot_num) {
  
                start_index = 0;
            }
            num_task -= _pl[start_index].signal(1);
        }
    }
    if (num_task > 0 &&
        flags_bthread_min_concurrency > 0 &&    // test min_concurrency for performance
        _concurrency.load(butil::memory_order_relaxed) < flags_bthread_concurrency) {
  
        // todo: reduce this lock
        baidu_scoped_lock(g_task_control_mutex);
        if (_concurrency.load(butil::memory_order_acquire) < flags_bthread_concurrency) {
  
            add_workers(1);
        }
    }
}

而当为false时,则调用ready_ro_run。

taskgroup::ready_to_run

而ready_to_run相较于ready_to_remote对比就简洁很多了。而这两个的区别就是ready_to_run()就是把任务入队到tg的 rq,ready_to_run_remote()是在当前线程不是brpc的worker()的时候(在worker外创建的 bthread任务),把任务通过tc入队到某个tg的 remote_rq。

void taskgroup::ready_to_run(bthread_t tid, bool nosignal) {
  
    push_rq(tid);
    if (nosignal) {
  
          _num_nosignal;
    } else {
  
        const int additional_signal = _num_nosignal;
        _num_nosignal = 0;
        _nsignaled  = 1   additional_signal;
        _control->signal_task(1   additional_signal);
    }
}

说了这么多了,回归到最初的两个新建函数,还一个新建函数是bthread_start_urgent。

bthread_start_urgent

大致是相同的,但我们会发现在存在tg的情况下,调用的函数会稍微有些区别。这里调用的是taskgroup::start_foreground。

int bthread_start_urgent(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg) {
  
    bthread::taskgroup* g = bthread::tls_task_group;
    if (g) {
  
        // start from worker
        return bthread::taskgroup::start_foreground(&g, tid, attr, fn, arg);
    }
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}

taskgroup::start_foreground

我们可以通过源码观察到,其实这个新建bthread函数就是把运行一起承包了,当然,耗时也会有差别。我们在测试运行上可以看出来差距。

int taskgroup::start_foreground(taskgroup** pg,
                                bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
  
    if (__builtin_expect(!fn, 0)) {
  
        return einval;
    }
    const int64_t start_ns = butil::cpuwide_time_ns();
    const bthread_attr_t using_attr = (attr ? *attr : bthread_attr_normal);
    butil::resourceid slot;
    taskmeta* m = butil::get_resource(&slot);
    if (__builtin_expect(!m, 0)) {
  
        return enomem;
    }
    check(m->current_waiter.load(butil::memory_order_relaxed) == null);
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
    check(m->stack == null);
    m->attr = using_attr;
    m->local_storage = local_storage_init;
    m->cpuwide_start_ns = start_ns;
    m->stat = empty_stat;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & bthread_log_start_and_finish) {
  
        log(info) << "started bthread " << m->tid;
    }
    taskgroup* g = *pg;
    g->_control->_nbthreads << 1;
    if (g->is_current_pthread_task()) {
  
        // never create foreground task in pthread.
        g->ready_to_run(m->tid, (using_attr.flags & bthread_nosignal));
    } else {
  
        // nosignal affects current task, not the new task.
        remainedfn fn = null;
        if (g->current_task()->about_to_quit) {
  
            fn = ready_to_run_in_worker_ignoresignal;
        } else {
  
            fn = ready_to_run_in_worker;
        }
        readytorunargs args = {
  
            g->current_tid(),
            (bool)(using_attr.flags & bthread_nosignal)
        };
        g->set_remained(fn, &args);
        taskgroup::sched_to(pg, m->tid);
    }
    return 0;
}

在这里我们进行的测试是测试利用bthread_start_urgent和bthread_start_background的创建耗时对比。创建10000个样例然后进行除法测出两种方法创建bthread所需时间。

test_f(bthreadtest, start_latency_when_high_idle) {
  
    bool warmup = true;
    long elp1 = 0;
    long elp2 = 0;
    int rep = 0;
    for (int i = 0; i < 10000;   i) {
  
        butil::timer tm;
        tm.start();
        bthread_t th;
        bthread_start_urgent(&th, null, log_start_latency, &tm);
        bthread_join(th, null);
        bthread_t th2;
        butil::timer tm2;
        tm2.start();
        bthread_start_background(&th2, null, log_start_latency, &tm2);
        bthread_join(th2, null);
        if (!warmup) {
  
              rep;
            elp1  = tm.n_elapsed();
            elp2  = tm2.n_elapsed();
        } else if (i == 100) {
  
            warmup = false;
        }
    }
    log(info) << "start_urgent=" << elp1 / rep << "ns start_background="
              << elp2 / rep << "ns";
}

这是一段测试代码,我们通过make然后脚本运行后观察数据。

发现bthread_start_background要比bthread_start_urgent耗时虽说差距不大,但是bthread_start_background还时要快一点。

以上就是我对bthread的理解了,有问题欢迎指出哦!

网站地图