基本介绍
初始化时b=t=1,代表队列为空,load是取值,store是赋值,push是从队尾添加(b 1),pop是从对首弹出(正常是b-1,有竞争了t 1),steal是从对首偷(t 1)。所以,b=t时队列为空,b=t 1时队列有1个元素。
竞争情况
由于对于此队列的push和pop操作发生在一次流程的两个阶段,所以不会产生竞争,push和steal不需要对一个元素竞争那么竞争就发生在pop和steal以及steal和steal中。比如:当前只有一个bth在队列里,tg1想从rq里pop一个,tg2想从rq里steal一个。
// workstealingqueue类成员
std::atomic baidu_cacheline_alignment _top; // 队列头部,用于worker直接放入bthread
std::atomic _bottom; // 队列尾部,用于其他worker去steal或者自己pop出来用
size_t _capacity; // 队列容量
t *_buffer; // 实际存放bthread的内存数组(连续)
bool push(const t& x) {
const size_t b = _bottom.load(base::memory_order_relaxed);
// 这里之所以使用acq是保证了这里读到的t一定是别的线程修改后或者修改前的值
const size_t t = _top.load(base::memory_order_acquire);
if (b >= t _capacity) { // full queue.
return false;
}
// 这里用了一个非常巧妙的 & 操作对下标做处理使得取元素时下标一直小于capacity(所以_capacity要求是偶数)
_buffer[b & (_capacity - 1)] = x;
// 这里rel保证了写_buffer之后这个b是精确 1的,即对应了steal的load acq
_bottom.store(b 1, base::memory_order_release);
return true;
}
// pop操作同样只和steal并发,核心逻辑是b-1之后锁定一个元素防止被steal取走,在只有一个元素的时候通过cas语义和steal竞争
bool pop(t *val)
{
const size_t b = _bottom.load(std::memory_order_relaxed);
size_t t = _top.load(std::memory_order_relaxed);
if (t >= b)
{
return false;
}
// 先让b-1,意思是假设我已经取了一个元素,那么我希望各bthread周知
const size_t new_b = b - 1;
_bottom.store(new_b, std::memory_order_relaxed);
// 这里的作用是atomic_thread_fence之前的所有操作会全局可见(从cpu的storebuffer至少刷新到l3 cache使多核可见)
std::atomic_thread_fence(std::memory_order_seq_cst);
// 保证当前t读的是最新值?如果不新,可能存在有线程已经steal走了但是我还取的旧值的情况
t = _top.load(std::memory_order_relaxed);
// 再次判断,此时说明队列为空,b回到原来的值
if (t > new_b)
{
_bottom.store(b, std::memory_order_relaxed);
return false;
}
*val = _buffer[new_b & (_capacity - 1)];
if (t != new_b) // 说明队列中还有元素,这时相当于已经用new_b锁住这个元素了,所以可以退出
{
// 那就是t < new_b,队列中元素不止一个,直接取出用就行
return true;
}
// 走到这里证明t = new_b,证明只剩一个元素了,需要和steal竞争
// 若_top等于t,则当前取线程取成功了,修改_top为t 1,返回true
// 若_top不等于t,则证明该元素被别人抢走了,将t改为_top,返回false
const bool popped = _top.compare_exchange_strong(
t, t 1, base::memory_order_seq_cst, base::memory_order_relaxed);
_bottom.store(b, base::memory_order_relaxed);
return popped;
}
// steal one item from the queue
bool steal(t *val)
{
size_t t = _top.load(std::memory_order_acquire);
size_t b = _bottom.load(std::memory_order_acquire);
// 队列为空直接返回false
if (t >= b)
{
return false;
}
do {
// 猜测:保证b读到的是最新的b
std::atomic_thread_fence(std::memory_order_seq_cst);
b = _bottom.load(std::memory_order_acquire);
if (t >= b)
{
// 空队列返回false
return false;
}
// 取元素
*val = _buffer[t & (_capacity - 1)];
} while (!_top.compare_exchange_strong(t, t 1, std::memory_order_seq_cst, std::memory_order_relaxed));
// 上面这个while的语义是:
// 若_top等于t,则当前线程取元素成功了,修改_top为t 1,返回true,退出循环走下来
// 若_top不等于t,则证明该元素被别人抢走了,将t改为_top,返回false,继续进入while
// 其他steal线程同理,谁拿到谁就t 1,然后告诉所有线程我取走这个元素了
return true;
}