菜鸟笔记
提升您的技术认知

brpc 笔记

bthread(一) 前言bthread(二) 线程模型及bthreadbthread(三) bthread数据结构bthread(四) bthread用户接口和代码执行路径bthread(五) 无锁队列rq的代码实现bthread(六) 小结brpc的精华bthread源码剖析brpc介绍、编译与使用brpc源码解析(一)—— rpc服务添加以及服务器启动主要过程brpc源码解析(二)—— brpc收到请求的处理过程brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制brpc源码解析(四)—— bthread机制brpc源码解析(五)—— 基础类resource pool详解brpc源码解析(六)—— 基础类socket详解brpc源码解析(七)—— worker基于parkinglot的bthread调度brpc源码解析(八)—— 基础类eventdispatcher详解brpc源码解析(九)—— 基础类workstealingqueue详解brpc源码解析(十)—— 核心组件bvar详解(1)简介和整体架构brpc源码解析(十一)—— reducer类和adder类解析brpc源码解析(十二)—— 核心组件bvar详解 agentgroup类详解brpc源码解析(十三)—— 核心组件bvar详解(4)combiner详解brpc源码解析(十四)—— 核心组件bvar详解 sampler详解brpc源码解析(十五)—— bthread栈创建和切换详解brpc源码解析(十六)—— 作为client的连接建立和处理详解brpc源码解析(十七)—— bthread上的类futex同步组件butex详解brpc源码解析(十八)—— mpsc队列executionqueue详解brpc源码解析(十九)—— 双buffer数据结构doublybuffereddata详解brpc源码解析(二十)—— 用于访问下游的channel类详解

bthread(五) 无锁队列rq的代码实现-ag真人游戏

阅读 : 438

基本介绍

初始化时b=t=1,代表队列为空,load是取值,store是赋值,push是从队尾添加(b 1),pop是从对首弹出(正常是b-1,有竞争了t 1),steal是从对首偷(t 1)。所以,b=t时队列为空,b=t 1时队列有1个元素。

竞争情况

由于对于此队列的push和pop操作发生在一次流程的两个阶段,所以不会产生竞争,push和steal不需要对一个元素竞争那么竞争就发生在pop和steal以及steal和steal中。比如:当前只有一个bth在队列里,tg1想从rq里pop一个,tg2想从rq里steal一个。

// workstealingqueue类成员
std::atomic baidu_cacheline_alignment _top;   // 队列头部,用于worker直接放入bthread
std::atomic _bottom;                          // 队列尾部,用于其他worker去steal或者自己pop出来用
size_t _capacity;                                   // 队列容量
t *_buffer;                                         // 实际存放bthread的内存数组(连续)
 
bool push(const t& x) {
    const size_t b = _bottom.load(base::memory_order_relaxed);
    // 这里之所以使用acq是保证了这里读到的t一定是别的线程修改后或者修改前的值
    const size_t t = _top.load(base::memory_order_acquire);
    if (b >= t   _capacity) { // full queue.
        return false;
    }
    // 这里用了一个非常巧妙的 & 操作对下标做处理使得取元素时下标一直小于capacity(所以_capacity要求是偶数)
    _buffer[b & (_capacity - 1)] = x;
    // 这里rel保证了写_buffer之后这个b是精确 1的,即对应了steal的load acq
    _bottom.store(b   1, base::memory_order_release);
    return true;
}
 
// pop操作同样只和steal并发,核心逻辑是b-1之后锁定一个元素防止被steal取走,在只有一个元素的时候通过cas语义和steal竞争
bool pop(t *val)
{
    const size_t b = _bottom.load(std::memory_order_relaxed);
    size_t t = _top.load(std::memory_order_relaxed);
    if (t >= b)
    {
        return false;
    }
    // 先让b-1,意思是假设我已经取了一个元素,那么我希望各bthread周知
    const size_t new_b = b - 1;
    _bottom.store(new_b, std::memory_order_relaxed);
    // 这里的作用是atomic_thread_fence之前的所有操作会全局可见(从cpu的storebuffer至少刷新到l3 cache使多核可见)
    std::atomic_thread_fence(std::memory_order_seq_cst);
    // 保证当前t读的是最新值?如果不新,可能存在有线程已经steal走了但是我还取的旧值的情况
    t = _top.load(std::memory_order_relaxed);
    // 再次判断,此时说明队列为空,b回到原来的值
    if (t > new_b)
    {
        _bottom.store(b, std::memory_order_relaxed);
        return false;
    }
    *val = _buffer[new_b & (_capacity - 1)];
    if (t != new_b) // 说明队列中还有元素,这时相当于已经用new_b锁住这个元素了,所以可以退出
    {
        // 那就是t < new_b,队列中元素不止一个,直接取出用就行
        return true;
    }
    // 走到这里证明t = new_b,证明只剩一个元素了,需要和steal竞争
    // 若_top等于t,则当前取线程取成功了,修改_top为t 1,返回true
    // 若_top不等于t,则证明该元素被别人抢走了,将t改为_top,返回false
    const bool popped = _top.compare_exchange_strong(
        t, t   1, base::memory_order_seq_cst, base::memory_order_relaxed);
    _bottom.store(b, base::memory_order_relaxed);
    return popped;
}
 
// steal one item from the queue
bool steal(t *val)
{
    size_t t = _top.load(std::memory_order_acquire);
    size_t b = _bottom.load(std::memory_order_acquire);
    // 队列为空直接返回false
    if (t >= b)
    {
        return false;
    }
    do {
        // 猜测:保证b读到的是最新的b
        std::atomic_thread_fence(std::memory_order_seq_cst);
        b = _bottom.load(std::memory_order_acquire);
        if (t >= b)
        {
            // 空队列返回false
            return false;
        }
        // 取元素
        *val = _buffer[t & (_capacity - 1)];
    } while (!_top.compare_exchange_strong(t, t   1, std::memory_order_seq_cst, std::memory_order_relaxed));
    // 上面这个while的语义是:
    // 若_top等于t,则当前线程取元素成功了,修改_top为t 1,返回true,退出循环走下来
    // 若_top不等于t,则证明该元素被别人抢走了,将t改为_top,返回false,继续进入while
    // 其他steal线程同理,谁拿到谁就t 1,然后告诉所有线程我取走这个元素了
    return true;
}

 

网站地图