一般无锁队列的情况分为两种,第一种是单个消费者与单个生产者,第二种是多个消费者或者多个生产着的情况。
一.单个消费者与单个生产者的情况
这种情况下可以用环形队列ringbuffer来实现无锁队列,比如dpdk和kfifo的无锁队列就是用环形队列实现的,kfifo里面的入队和出队的处理很巧妙,大家可以去看看。
dpdk:https://www.coonote.com/cplusplus-note/lockless-circular-queue.html
kfifo:https://www.coonote.com/cplusplus-note/linux-kfifo.html
用数组模拟环形队列举个简单的例子:
#include
#include
#include
#include
using namespace std;
#define ring_queue_size 10
template
class ringbuffer {
public:
ringbuffer(int size): m_size(size),m_head(0), m_tail(0) {
m_buf = new t[size];
}
~ringbuffer() {
delete [] m_buf;
m_buf = null;
}
inline bool isempty() const {
return m_head == m_tail;
}
inline bool isfull() const {
return m_tail == (m_head 1) % m_size; //取模是为了考虑队列尾的特殊情况
}
bool push(const t& value) { //以实例的方式传值
if(isfull()) {
return false;
}
m_buf[m_head] = value;
m_head = (m_head 1) % m_size;
return true;
}
bool push(const t* value) { //以指针的方式传值
if(isfull()) {
return false;
}
m_buf[m_head] = *value;
m_head = (m_head 1) % m_size;
return true;
}
inline bool pop(t& value)
{
if(isempty()) {
return false;
}
value = m_buf[m_tail];
m_tail = (m_tail 1) % m_size;
return true;
}
inline unsigned int head()const {
return m_head;
}
inline unsigned int tail()const {
return m_tail;
}
inline unsigned int size()const {
return m_size;
}
private:
int m_size; // 队列大小
int m_head; // 队列头部索引
int m_tail; // 队列尾部索引
t* m_buf; // 队列数据缓冲区
};
typedef struct node { //任务节点
int cmd;
void *value; //可根据情况改变
}tasknode;
#if 1
void produce(ringbuffer* rqueue) {
int i = 0;
for(i=0;ipush(node);
}
}
void consume(ringbuffer* rqueue) {
while(!rqueue->isempty()) {
tasknode node;
rqueue->pop(node);
}
}
#endif
int main()
{
ringbuffer * rqueue = new ringbuffer(ring_queue_size);
std::thread producer(produce,rqueue);
std::thread consumer(consume,rqueue);
producer.join();
consumer.join();
delete rqueue;
return 0;
}
二.多个生产者或者多个消费者的情况
这种情况一般都是出现在多线程开发的场合,会有多个对象同时操作队列,此时为了避免这种情况,可以有两种方式,第一种就是最常见的加锁,第二种就是无锁队列,用原子操作实现,无锁数据结构依赖很重要的技术就是cas操作——compare & set,或是 compare & swap,基本的思路就是先与内存中的值进行比较,如果相同就进行set或者swap操作。
常见的gcc内置原子操作有下面这些。
- type __sync_fetch_and_add (type *ptr, type value, ...)
// 将value加到*ptr上,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_sub (type *ptr, type value, ...)
// 从*ptr减去value,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_or (type *ptr, type value, ...)
// 将*ptr与value相或,结果更新到*ptr, 并返回操作之前*ptr的值 - type __sync_fetch_and_and (type *ptr, type value, ...)
// 将*ptr与value相与,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_xor (type *ptr, type value, ...)
// 将*ptr与value异或,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_fetch_and_nand (type *ptr, type value, ...)
// 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之前*ptr的值 - type __sync_add_and_fetch (type *ptr, type value, ...)
// 将value加到*ptr上,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_sub_and_fetch (type *ptr, type value, ...)
// 从*ptr减去value,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_or_and_fetch (type *ptr, type value, ...)
// 将*ptr与value相或, 结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_and_and_fetch (type *ptr, type value, ...)
// 将*ptr与value相与,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_xor_and_fetch (type *ptr, type value, ...)
// 将*ptr与value异或,结果更新到*ptr,并返回操作之后新*ptr的值 - type __sync_nand_and_fetch (type *ptr, type value, ...)
// 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之后新*ptr的值 - bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
// 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回true - type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
// 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回操作之前*ptr的值 - __sync_synchronize (...)
// 发出完整内存栅栏 - type __sync_lock_test_and_set (type *ptr, type value, ...)
// 将value写入*ptr,对*ptr加锁,并返回操作之前*ptr的值。即,try spinlock语义 - void __sync_lock_release (type *ptr, ...)
// 将0写入到*ptr,并对*ptr解锁。即,unlock spinlock语义
下面就用环形数组来实现无锁队列,举个例子,当然用环形链表也可以,但是链表要考虑到一个aba的问题(这个后面再介绍分析)。
#include
#include
#include
#include
#include
using namespace std;
#define ring_queue_size 10
//std::mutex mtx;
template
class ringbuffer {
public:
ringbuffer(int size): m_size(size),m_head(0), m_tail(0) {
m_buf = new t[size];
}
~ringbuffer() {
delete [] m_buf;
m_buf = null;
}
inline bool isempty() const {
return m_head == m_tail;
}
inline bool isfull() const {
return m_tail == (m_head 1) % m_size; //取模是为了考虑队列尾的特殊情况
}
/*使用互斥锁实现并发情况共用队列*/
# if 0
bool mut_push(const t& value);
bool mut_push(const t* value);
bool mut_pop(t& value);
#endif
/*使用原子操作实现并发情况无锁队列*/
bool cas_push(const t& value);
bool cas_push(const t* value);
bool cas_pop(t& value);
inline unsigned int head()const {
return m_head;
}
inline unsigned int tail()const {
return m_tail;
}
inline unsigned int size()const {
return m_size;
}
private:
int m_size; // 队列大小
int m_head; // 队列头部索引
int m_tail; // 队列尾部索引
t* m_buf; // 队列数据缓冲区
};
# if 0 //互斥锁实现
template
bool ringbuffer::mut_push(const t& value) {
while (mtx.try_lock()) {
if(isfull()) {
return false;
}
m_buf[m_head] = value;
m_head = (m_head 1) % m_size;
mtx.unlock();
return true;
}
}
template
bool ringbuffer::mut_push(const t* value) {
while (mtx.try_lock()) {
if(isfull()) {
return false;
}
m_buf[m_head] = *value;
m_head = (m_head 1) % m_size;
mtx.unlock();
return true;
}
}
template
bool ringbuffer::mut_pop(t& value)
{
while (mtx.try_lock()) {
if(isempty()) {
return false;
}
value = m_buf[m_tail];
m_tail = (m_tail 1) % m_size;
mtx.unlock();
return true;
}
}
#endif
template
bool ringbuffer::cas_push(const t& value) {
if(isfull()) {
return false;
}
int oldvalue,newvalue;
do{
oldvalue = m_head;
newvalue = (oldvalue 1) % m_size;
}while(__sync_bool_compare_and_swap(&m_head,oldvalue,newvalue) != true);
m_buf[oldvalue] = value;
return true;
}
template
bool ringbuffer::cas_push(const t* value) {
if(isfull()) {
return false;
}
int oldvalue,newvalue;
do{
oldvalue = m_head;
newvalue = (oldvalue 1) % m_size;
}while(__sync_bool_compare_and_swap(&m_head,oldvalue,newvalue) != true);
m_buf[oldvalue] = *value;
return true;
}
template
bool ringbuffer::cas_pop(t& value)
{
if(isempty()) {
return false;
}
int oldvalue,newvalue;
do{
oldvalue = m_tail;
newvalue = (oldvalue 1) % m_size;
}while(__sync_bool_compare_and_swap(&m_tail,oldvalue,newvalue) != true);
value = m_buf[oldvalue];
return true;
}
typedef struct node { //任务节点
int cmd;
void *value;
}tasknode;
void produce(ringbuffer* rqueue) {
int i = 0;
for(i=0;icas_push(node);
}
}
void consume(ringbuffer* rqueue) {
while(!rqueue->isempty()) {
tasknode node;
rqueue->cas_pop(node);
}
}
int main()
{
int i = 0;
ringbuffer * rqueue = new ringbuffer(ring_queue_size);
std::thread producer[20];
std::thread consumer[20];
for(i = 0; i<20; i ) {
producer[i] = std::thread(produce,rqueue);
consumer[i] = std::thread(consume,rqueue);
}
for (auto &thread : producer)
thread.join();
for (auto &thread : consumer)
thread.join();
delete rqueue;
return 0;
}
这段代码中无锁队列实现的关键就是__sync_bool_compare_and_swap(&m_head,oldvalue,newvalue)函数,它会时刻将m_head和oldvalue比较,如果被其它线程抢先改了head的值就会返回失败。
现在说一下前面提到的aba的问题,基本的情况是下面这样的,多出现在内存复用的时候:
1.假设队列中只有一个节点m,线程a进行出队列操作,取得了当前节点m,节点m的地址是0x123,线程a被线程b抢占打断,还没进行出队列的实际操作
2.线程b同样将节点m出队列了,然后又重新申请了一个节点n,巧的是节点n的地址也是0x123,线程b将n节点入队列
3.线程b又被线程a打断,此时线程a重新开始执行,但是cas操作的接口比较的是地址,线程a发现节点的地址没有改变,又将n节点出队列了。
这种情况就好比电视据中经常出现的剧情,男主和女主的行李箱发生的交互,但是互不知晓。
通常这种情况会用双重cas来进行保证,在加一个计数器,使用节点内存引用计数refcnt,来判断值是否是原来的。