菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于parkinglot的bthread调度brpc源码解析(八)—— 基础类eventdispatcher详解brpc源码解析(九)—— 基础类workstealingqueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— reducer类和adder类解析brpc源码解析(十二)—— 核心组件bvar详解 agentgroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— mpsc队列executionqueue详解brpc源码解析(十九)—— 双buffer数据结构doublybuffereddata详解brpc源码解析(二十)—— 用于访问下游的channel类详解

bthread(四) bthread用户接口和代码执行路径-ag真人游戏

阅读 : 380

bthread用户接口

bthread_t(类似pthread_t):64位int,前32位版本号防止aba问题(释放后又重新被分配了,歧义),后32位为资源池(无全局竞争可o(1)访问的数组)中下标,可以很快地找到taskmeta(里面有个变量就是这个bth的栈)

起bth基本函数

bthread_start_urgent & bthread_start_background 里面都是先判断g是否为空(是否运行在worker里),是的话运行start_xxx,不是运行start_from_non_worker(直接调用start_background

  • start_foreground:如果当前在bthread内,就在当前worker内原地启动(调ready_to_run或ready_to_run_remote)保证locality,把当前bth加到_rq队尾,再去跑新bthread,实现:set_remained(ready_to_run(current_bth)) sched_to(new_bth)
  • start_background:后台起,但是不希望立刻跑这个bthread,将来有时间跑就行,直接调ready_to_run把新bth加到_rq队尾,如果是在start_from_non_worker里起就加到_remote_rq,实现:ready_to_run[_remote](new_bth)
// 让出当前worker立即执行新bthread,当前bthread随后调度
int bthread_start_urgent(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg) __throw {
    bthread::taskgroup* g = bthread::tls_task_group;
    if (g) {
         // 从worker里起的bthread,调ready_to_run,再主动调一遍sched_to去执行这个tid
        return bthread::taskgroup::start_foreground(&g, tid, attr, fn, arg);
    }
    // 一些初始化的工作,最后会创建1个tc、4个pl、9个pthread、9个tg
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}
 
// 放到队列里,不急着切线程(sched_to)
int bthread_start_background(bthread_t* __restrict tid,
                             const bthread_attr_t* __restrict attr,
                             void * (*fn)(void*),
                             void* __restrict arg) __throw {
    bthread::taskgroup* g = bthread::tls_task_group;
    if (g) {
        // 从worker里起的bthread,调ready_to_run
        return g->start_background(tid, attr, fn, arg);
    }
    // 同上
    return bthread::start_from_non_worker(tid, attr, fn, arg);
}
 
start_from_non_worker(bthread_t* __restrict tid,
                      const bthread_attr_t* __restrict attr,
                      void * (*fn)(void*),
                      void* __restrict arg) {
    // 核心函数:创建taskgroup,见下
    taskcontrol* c = get_or_new_task_control();
    ...
    // 选择一个taskgroup执行start_background,调ready_to_run_remote
    return c->choose_one_group()->start_background(
        tid, attr, fn, arg);
}
 
inline taskcontrol* get_or_new_task_control() {
    // 全局变量tc(g_task_control)初始化原子变量
    butil::atomic* p = (butil::atomic*)&g_task_control;
    // 通过原子变量进行load,取出tc指针,如果不为空,直接返回
    taskcontrol* c = p->load(butil::memory_order_consume);
    if (c != null) {
        return c;
    }
    ...   
    // 走到这,说明tc确实为null,开始new一个
    c = new (std::nothrow) taskcontrol;
    if (null == c) {
        return null;
    }
    // 用并发度concurrency来初始化全局tc
    int concurrency = flags_bthread_min_concurrency > 0 ?
        flags_bthread_min_concurrency :
        flags_bthread_concurrency;
 
 
    // init函数核心见下
    if (c->init(concurrency) != 0) {
        log(error) << "fail to init g_task_control";
        delete c;
        return null;
    }
 
    // 4. 将全局tc存入原子变量中
    p->store(c, butil::memory_order_release);
    return c;
}
 
// taskcontrol的init函数
for (int i = 0; i < _concurrency;   i) {
    const int rc = pthread_create(&_workers[i], null, worker_thread, this);
    if (rc) {
        log(error) << "fail to create _workers[" << i << "], " << berror(rc);
        return -1;
    }
}
// worker_thread之前有过说明

【代码说明】若taskgroup为空,证明当前是跑在pthread上,调用start_from_non_worker检查是否已经创建了tackcontrol单例,已经创建就无所谓了,没有就new taskcontrol去创建,new后会执行tackcontrol的init,核心就是用pthread启动指定数量的worker。得到tackcontrol单例后,用tackcontrol选取一个tackgroup,新建bthread进行调度;taskgroup不为空,表明此bthread就是在worker内被创建的,在对应tackgroup(worker)执行新bthread的启动即可。

两个常见的阻塞操作

  • yield:把当前的bth加到_rq队尾,执行调度函数选下一个,实现:set_remained(ready_to_run(current_bth)) sched
  • usleep:涉及超时管理,先从当前队里摘掉,把当前bth加到_remote_rq里,当定时器到了,加到_rq里运行,实现:add timer(ready_to_run_remote(current_bth)) sched

基本流程

tc创建后会pthread_create 9个线程去初始化tg,避免全局变量加锁处理,效率低,也要避免惊群。一旦出现某一个bthread可以被偷了会唤醒很多worker,会发生惊群,要处理。

void* taskcontrol::worker_thread(void* arg) {
    // 获取tc指针
    taskcontrol* c = static_cast(arg);
    // 创建一个task_group
    taskgroup* g = c->create_group();
    taskstatistics stat;
    if (null == g) {
        log(error) << "fail to create taskgroup in pthread=" << pthread_self();
        return null;
    }
  
    // 重要变量,定义baidu_thread_local taskgroup* tls_task_group;
    // tls_task_group是一个thread_local变量,且只有由taskcontrol启动的worker线程所拥有的task_group tls_task_group非null
    tls_task_group = g;
    // 计数 1
    c->_nworkers << 1;
    // 当前线程主要运行的任务,即死循环中等待唤醒
    g->run_main_task();
    // ...
    // 销毁
    tls_task_group = null;
    g->destroy_self();
    c->_nworkers << -1;
    return null;
}

每个tg创建后会创建后当前线程的各种变量,然后去调 run_main_task 去循环等待是否有可处理的bthread。

void taskgroup::run_main_task() {
    // ......
    taskgroup* dummy = this;
    bthread_t tid;
    // 这个while就是卡住等bthread id,这个id可能是从队列里取出来的,也可能是偷到的
    while (wait_task(&tid)) {   // 1
        // 这里的唤醒是用futex实现的,也就是利用一个原子变量判断是否需要去尝试加锁访问,然后在实际去加锁访问
        // 走到这里就说明已经获取到一个可执行的bthread,那么调sched_to去切线程
        taskgroup::sched_to(&dummy, tid);   // 2
        dcheck_eq(this, dummy);
        dcheck_eq(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
            // 只有当前线程不是调度bthread线程的时候才会去执行用户func
            taskgroup::task_runner(1/*skip remained*/); // 3
        }
        // ......
}
 
// 1 每个worker调度线程卡在的位置
bool taskgroup::wait_task(bthread_t* tid) {
    do {
        // ...
        _pl->wait(_last_pl_state);  
        // wait:内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞
        // 当阻塞结束之后,执行steal_task()来work stealing,如果窃取成功则返回,不成功就继续wait
        if (steal_task(tid)) {
            // 顺序为先看本地_remote_rq再去其他worker偷
            // 偷的顺序为先偷别_rq的再偷_remote_rq的
            return true;
        }
    } while (true);
}
 
// 2 用内核调用实现上下文的切换
inline void taskgroup::sched_to(taskgroup** pg, bthread_t next_tid) {
    // 根据tid找taskmeta
    taskmeta* next_meta = address_meta(next_tid);
    // 给stk用next_meta赋一些值
    if (next_meta->stack == null) {
        contextualstack* stk = get_stack(next_meta->stack_type(), task_runner);
        // ...
    }
    // 重载的sched_to会判断next_meta和cur_meta是否为同一个不是同一个需要调jump_stack去切栈
    // 这其中直接嵌入了汇编代码模拟pthread切栈操作
    sched_to(pg, next_meta);
}
 
// 3 执行用户实际的func并找下一个
void taskgroup::task_runner() {
    taskmeta* const m = g->_cur_meta;
    // 执行应用程序设置的任务函数,在任务函数中可能yield让出cpu,也可能产生新的bthread
    m->fn(m->arg);
    // 任务函数执行完成后,需要唤起等待该任务函数执行结束的pthread/bthread
    butex_wake_except(m->version_butex, 0);
    // ending_sched函数本意:将pthread线程执行流转入下一个可执行的bthread(普通bthread或“调度bthread”)
    // 在ending_sched()内会尝试从本地taskgroup的rq队列中找出下一个bthread
    // 或者调steal_task从其他pthread的taskgroup上steal一个bthread(当然也是先看_remote_rq,再偷)
    // 如果没有bthread可用则下一个被执行的就是pthread的“调度bthread”
    // 然后通过sched_to()将pthread的执行流转入下一个bthread的任务函数
    ending_sched(&g);
}

【场景】以proxy为例首先在mixer-framework里已经调用过了bthread_start_background初始化一些必要的东西(tc、tg等),然后在send_minibs_request_impl函数中用bthread_start_background则是相当于在bthread内启动了一个新的bthread,想要执行函数func,由于这个新bth不是在pthread内起的,所以肯定会被放在该taskgroup的_remote_rq中,然后,这个阻塞在wait_task上的taskgroup会被唤醒,进入以下步骤取出tid来:

  • 首先从本taskgroup的_remote_rq队列中pop出一个tid(_remote_rq.pop(tid)),pop成功就直接返回
  • 如果pop失败了,调用steal_task从其它taskgroup中去偷一个tid,偷的顺序在上面

无论如何拿到tid后,进入taskgroup::sched_to去切栈,具体栈跳跃步骤为:

1、调用jump_stack进入新的bthread栈,由于这时新的bthread还没有分配栈,会首先为它创建一个栈new_stack

2、新的bthread创建完后,利用bthread_jump_fcontext修改栈顶指针、各寄存器进入新栈new_stack,并记录跳转前的旧栈stack_main的相关内容

3、新栈的执行函数fn为taskgroup::task_runner(注意这个就是用户的func,层层函数指针传入),这个task_runner主要会干3个事儿

  • 执行用户实际的回调函数func(用户想让bthread干的事)
  • 唤起等待该任务函数执行结束的pthread/bthread
  • ending_sched:尝试再pop出一个bthread来,如果现在没有新的bthread,就把_main_meta作为下一次的跳转对象(相当于返回去继续执行原来的代码),再次调用jump_stack由new_stack跳转入stack_main,相当于又拿到了一个tid继续运行

4、都运行完了继续下一次wait_task中的do while死循环,重新阻塞在_pl→wait(...)上

网站地图