前面已经介绍过了bthread的主要机制,但对于具体的调度细节没有过多涉及,本篇将在着重介绍下bhtread在各个worker(taskgroup)之间的调度方式。
在brpc里,有个和调度相关的重要类parkinglot,parking lot 本质上就是基于futex的wait/signal,以前没太多接触过futex,刚好趁着阅读brpc的源码熟悉下futex的机制。futex是linux一个重要的用户态和内核态混合的同步机制,网上介绍的文章有很多,完整阐述也比较复杂,这里就不赘述了,简单来说就是在没有竞争的时候不用切换到内核态,保证性能,只有确实有竞争的时候才切换,切换到内核态的开销比较大。
parking lot字面意思是停车场,估计是为了形象地表示woker可能会停在上面,这个类很简单,类变量只有一个_pending_signal:
这个就是用于wait和signal的futex变量,需要注意的是,留了最低位作为一个是否停止的标识。
类函数有如下四个:
signal是唤醒num_task个等待在_pending_signal上的线程,内部实现就是调用futex_wake_private,在调用之前对_pending_signal执行了原子加,加的是num_task << 1,之所以要左移是因为第一位是用于表明是否停止的标识位。启动一个bthread就会调用一次signal(1)。
get_state是获取用于wait的状态,就是直接返回_pending_signal的值,返回类型是state,一个parking lot的内部类,因为有一个int参数的构造函数,可以自动转换,如下:
wait是如果_pending_signal的当前值和先前拿到的expected_state.val相等的话就wait。内部调用的是futex_wait_private。
stop则是将停止的标识位置为1,然后唤醒所有wait的线程,这里的stop指的就是wait的stop,这也会让后续取的state的stopped()返回true。
为了避免竞争太激烈,brpc会用将worker分配到多个pl上去,task_group(worker)有一个parkinglot* _pl变量,记录了本worker所用的pl,初始化taskgroup的时候赋值如下,将pthreadid hash过后根据parking_lot_num取余,目前brpc用的是4个:
有的小伙伴可能有疑惑,上面说了每次signal(1),即便只有一个竞争是不是也还好。其实不然,比如以下两点:
1.加了bthread就会去去pl上调signal,fetch_add太密了也是会竞争影响性能的。
2.state和last_state一致的时候就直接wait了,否则会尝试去steal,设想一个极端场景,64个worker,同时分别开始执行一个bth,待执行的bth队列为空,这个时候加入了一个bth,会导致pl的state变化,如果只有一个pl,64个worker分别执行完自己的任务后都会去尝试steal,如果是4个pl,则只有16个会观察到变化的pl去steal,剩下的直接睡过去了。
taskgroup类里和bthread调度直接相关的两个主要函数为sched和sched_to,前者是让出当前tg按照调度规则从队列里调度下一个bt,后者是让出当前tg直接调度指定tg,sched如下:
work stealing queue(wsq)的push和pop都是在bottom一侧,而steal是在top一侧,
如果没有bthread_fair_wsq宏定义,会使用pop,否则是用steal,从fifo的角度来说,使用steal是更公平的,但是开销会更大。
下面就从一个woker的创建开始整体捋一下bhtread的调度,前面关于bthread机制的文章提到过,在taskcontrol的init里,会启动worker,worker入口函数是run_main_task(),核心代码如下:
wait_task函数就用到了上面说的_pl,这也是futex机制的典型使用方式,代码如下:
首先整个函数内容是一个死循环,也就是我们常说的spinlock,因为futex机制的特性,通常都是结合spinlock来使用,循环体里面,会根据bthread_dont_save_parking_state宏定义是否存在来确定执行的代码,如果没有定义bthread_dont_save_parking_state,说明要保存pl状态,则会根据上一次的steal_task里保存的状态来判断。关于bthread_dont_save_parking_state,_last_pl_state的定义和steal_task也用到了,如下:三者共同形成了两个分支:
这两个分支功能上没什么区别,为什么需要两个分支暂时还没有想明白,后面如果看懂了再补充。
总的来说,就是不断循环进行以下操作:
判断_pl是否处于停止状态,如果是,则直接返回-1,pl被调用stop()后会进入停止状态,正常运行过程中stop不会被调用。
尝试steal_task,如果取到了task则返回true,没取到则根据上次的状态进行wait,因为是在循环里,根据futex的机制,如果上一个state和当前_pl上的state一致,那么说明_pl上的任务没变化,继续steal没有意义,则wait,否则说明有其他地方调用_pl上的signal,也就是有新的任务加到某个队列里,_pending_signal也会发生变化,steal有可能成功。如果进入了wait,在_pl的signal被调用的时候也会被唤醒。
steal_task实质是调用taskcontrol单例的steal_task函数,如下:
依次按规则从各个woker里取,优先本地队列,后remote队列。
以上就是woker从创建到运行的机制,也就是一直循环取task然后执行,有可能wait在某个parking lot上等待其他的唤醒。
还有一种调度的情况是在bthread执行过程中调用bthread_yield让出当前的worker,如下:
实质是调用taskgroup::yield,如下:
首先是用set_ramained把 ready_to_run_in_worker和当前bthread注册到下个bhtread启动前的回调里,然后使用上面说到的sched调度下一个bthread,这样在下个bthread执行前会将让出worker的bthread放到队列里供调度执行。
总的来说,是基于futex机制实现的同步,并通过设置多个parking lot的方式减小了竞争。