锁
编写代码时,我们以及习惯了用锁去保护数据。那么,这里的锁是什么?为什么它能满足我们的要求?它存在于哪里?
让我们从一个最简单的例子出发---多个线程并发修改一个全局变量:
/* 全局变量 */
int g_sum = 0;
/* 每个线程入口 */
void *thread(void* arg)
{
for(int i = 0; i < 100; i )
{
g_sum ;
}
return null;
}
在多核
处理器上,如果有两个线程同时执行上面的累加操作,最终的g_sum
几乎不可能是预期的200
(每个线程累加100
次),而更倾向于是一个接近200
的随机值。
这是因为cpu
对g_sum
进行累加时,它们都会:1
.从内存中读取 2
.修改它的值 3
.将新值写回内存。由于cpu
之间是独立的,而内存是共享的,所以就有可能存在一种时序:两个cpu
先后从内存中读取了g_sum
的值,并各自对它进行了递增,最终将新的值写入g_sum
,这时。两个线程的两次累加最终只让g_sum
增加了1
临界区
要解决上面的问题,一个很自然的想法同一时间段内,要想办法只让一个线程对全局变量进行读-修改-写。我们可以用锁
去保护临界区
这里引入了临界区
的概念。临界区是指访问共用资源的程序片段(比如上面的例子中的"g_sum ")。线程在进入临界区时加锁,退出临界区时解锁。也就是说,锁将临界区"保护"了起来。
临界区
是人们为一段代码片段强加上的概念,但加锁
和解锁
不一样,它必须实打实地存在于代码中。那么问题来了,锁
应该如何实现 ?
为了回答这个问题,我们先将锁
需要具有的特性列出来:
1
. 它需要支持加锁(lock
)和解锁(unlock
)两种操作。2
. 它需要是有状态(state
)的,它需要记录当前这把锁处于locked
还是unlocked
状态。3
. 锁的状态变化必须是原子(atomic)的4
. 当它处于locked
状态时,对其进行加锁(lock
)的操作,不会成功。
第1
条,对实现者来说,一是要提供两个api
分别对应这两种操作。
第2
条,需要一个地方能记录锁的状态,对计算机系统来说,这个地方只能是内存
。
第3
条,将锁
的状态记录在内存中有个和全局变量一样的问题,那就是如何避免多个线程同时去改变锁的状态 ? 总不能用锁
去保护锁
吧 ? 好在各个体系的cpu
都提供了这种原子操作的原语, 对x86
来说,就是指令的lock
前缀, 它可以在执行指令时控制住总线,直到指令执行完成。这也就保证了锁
的状态修改是通过原子
操作完成的。
第4
条,加锁操作成功的前提是锁
的状态是处于"unlocked",如果该条件不满足,则本次加锁操作失败,那么失败以后的行为呢?不同的锁有不同的实现,一般来说有三种可选择的行为:1
.立即返回失败 2
.不断尝试再加锁,直到成功. 3
. 睡眠线程自己,直到可以获得锁。
典型实现
当然,我们并不需要去重复造锁
的轮子。
在用户空间,glibc
提供了诸如spinlock
、semaphore
、rwlock
、mutex
类型的锁的实现,我们只要使用api
就行。
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
.......
在内核空间,linux
也有类似的实现.
性能损失
在刚才的例子中,如果我们使用了锁
去保护g_sum
,那么最终一定能得到200
。但是,我们在得到准确结果的同时也会付出性能的代价。
如果把临界区
比作成一个独木桥,那么线程就是需要过独木桥的人。 显然,如果过桥的人(并发访问临界区的线程)越多,独木桥越长(锁保护的临界区的范围越大),那么其他人等地就越久(性能就下降地越厉害)。
下面这是在一台8
核cpu
虚拟机环境下,测试程序的运行结果。
横坐标是并发运行的线程的数目,纵坐标是完成相同任务(累加一定次数)时的运行时间。越多的线程会带来越多的冲突,因此,总的运行时间会逐渐增大。
如果增加临界区的长度呢(在每次循环中增加一些额外指令),则会得到下面的结果:
横坐标表示额外的指令,纵坐标依然表示时间。
可见,线程的并发越多、临界区越大都会造成程序性能下降。这也是为什么追求性能的程序会选择使用每cpu变量
(或者每线程变量),并且尽量减小锁保护的粒度。
futex
前面说过,锁
是有状态的,并且这个状态需要保存在内存中。那么?具体到linux
平台,锁
对象是保存在内核空间还是用户空间呢? 在比较早的内核(2.5.7)中,这个对象是保存在内核中的,这是很自然的做法。因为当一个线程(task
)去等待获得一个互斥锁时,如果获取不到,那么它需要将积极睡眠,直到锁
可用后再被唤醒。
这个过程具体来说,就是将自己的task_struct
挂到锁
对象的等待链表上。当锁
的持有者unlock
时,内核就可以从该等待列表上找到并唤醒链表上所有task
。
可见,每次用户的加锁解锁操作都必须陷入内核(即使现在没有其他线程持有这把锁)。陷入内核意味着几百个时钟就消耗了。在冲突不大的场景中,这种消耗就白白浪费了。
因此,从2.5.7
版本开始,linux
引入了futex
(fast userspace mutexes
),即快速的用户态互斥机制,这个机制是用户态和内核态共同协作完成的,它将保存锁
状态的对象放在用户态。如果用户在加锁时发现锁
处于(unlocked
)状态,那么就直接修改状态就好了(fast path
),不需要陷入内核。当然,如果此时锁处于(locked
)状态,还是需要陷入内核(slow path
)。
那么我们如何使用futex
机制呢?答案是我们完全不需要显示地使用,glibc
库中的semaphore
、mutex
底层就是使用的futex
。
无锁
锁
是通过一个状态的原子操作来保证共享数据的访问互斥。而无锁
的意思就是不需要这样一个状态。
cas
说到无锁
,必须提到的就是cas
指令(也可以叫csw
)。cas
是compareandswap
的缩写,即比较-交换。不同体系的cpu
有不同的cas
的指令实现。在x86
上,就是带lock
前缀的cmpxchg
指令。所以,cas
操作是原子的
它的功能用伪代码描述就是下面这样(仅为理解,实际是一条原子指令):
bool compare_and_swap(int *src, int *dest, int newval)
{
if (*src == *dest) {
*src = newval;
return true;
} else {
return false;
}
}
第一个操作数的内容与第二个操作数的内容相比较, 如果相同,则将第三个操作数赋值给第一个操作数,返回true
, 否则返回false
。
较新版本的gcc
已经内置了cas
操作的api
(如下)。其他编译器也提供了类似的api
,不过这不是本文的重点。
bool __sync_bool_comware_and_swap(type *ptr, type oldval, type newval);
基于链表的无锁队列
无锁
通常构建无锁队列(lock-free queue
)。顾名思义,无锁队列就是指不使用锁结构
来控制多线程并发互斥的队列。
我们知道,队列是一个典型的先入先出(fifo
)的数据结构,具有入队(enqueue
)和出队(dequeue
)两种操作。并发条件下,多个线程可能在入队或出队时会产生竞争。
以单向链表为基础实现的队列如下图所示(有一个dummy
链表头),线程1和线程2都希望自己能完成入队操作
通常来说,入队要完成两件事:
如果可以使用锁
,我们可以通过将以上两件事放到一个锁
的保护范围内就能完成线程的互斥,那么对于无锁呢?
john d.valois
在《implemeting lock-free queues》中提出的无锁队列的入队列算法如下(伪代码):
enqueue(x)
{
/* 创建新的节点 n */
n = new node();
n->value = x;
n->next = null;
do {
t = tail; // 取得尾节点
succ = cas(t->next, null, n) // 尝试更新尾节点的next指向新的节点
if succ != true
cas(tail, t, t->next) // 更新失败,尝试将tail向后走
}while(succ != true);
cas(tail, t, n); // 更新队列的tail指针,使它指向新的节点
}
这里的enqueue
算法中使用了三次cas
操作。
1
. 第一次cas
操作更新尾节点的next指向新的节点。如果在单线程环境中,这个操作必定成功。但在多线程环境,如果有多个线程都在进行enqueue
操作,那么在线程t1取得尾节点后,线程t2可能已经完成了新节点的入队,此时t1的cas
操作就会失败,因为此时t->next
已经不为null
了,而变成了t2新插入的节点。
再强调一遍,cas
操作会锁住总线!因此t1和t2只有一个线程会成功,成功的线程会更新尾节点的next
,另一个线程会因为cas
失败而重新循环。
如果cas
操作成功,链表会变成下面这样,此时的tail
指针还没有更新
2
. 如果第一个cas
失败,说明有其他线程在坏事(进行了元素入队),这个时候第二个cas
操作会尝试推进tail
指针。这样做是为了防止第一个cas
成功的线程突然挂掉而导致不更新tail
指针
3
. 第三个cas
操作更新尾节点的next
论文中还给出了另一个版本的入队算法,如下所示
enqueue2(x)
{
/* 创建新的节点 n */
n = new node();
n->value = x;
n->next = null;
oldt = t = tail
do {
while(t->next != null) // 不断向后到达队列尾部
t = t->next
}while(cas(t->next, null, n) != true); // 更新尾节点的next指向新的节点
cas(tail, oldt, n); // 更新队列的tail指针,使它指向新的节点
}
与前一个的版本相比,新版本在循环内部增加了不断向后遍历的过程,也就是如果tail
指针后面已经有被其他线程添加了节点,本线程并不会等待tail
更新,而是直接向后遍历。
再来看出队,论文中给出的出队算法如下:
dequeue()
{
do {
h = head;
if h->next = null
error queue_empty;
}while (cas(head, h, h->next)!= true)
return h->next->value;
}
需要特别注意,该出队算法不是返回队首的元素,而是返回head->next
节点。完成出队后,移动head
指针到刚出队的元素。算法中使用了一个cas
操作来控制竞争下的head
指针更新。另外,算法中并没有描述队列元素的资源释放。
基于数组的无锁队列
以链表为基础的无锁队列有一个缺点就是内存的频繁申请和释放,在一些语言实现中,这种申请释放本身就是带锁的。包含有锁操作的行为自然称不上是无锁。因此,更通用的无锁队列是基于数组实现的。论文中描述了一种基于数组的无锁队列算法,它具有以下一些特性:
1
. 数组预先分配好,也就是能容纳的元素个数优先
2
. 使用者可以将值填入数组,除此之外,数组有三个特殊值:head
, tail
和empty
。队列初始化时(下图),除了有两个相邻的位置是填入head
, tail
之外,其他位置都是empty
。显然,用户数据不能再使用这三个值了。。
- 入队操作:假设用户希望将一个值
x
入队,它会找到tail
的位置,然后对该位置和之后的位置执行一次double-word cas
。该操作将<tail
,empty
>原子地替换为<x
,tail
>。当然,如果tail
后面不是empty
(而是head`),就说明队列满了,入队失败。 - 出队操作:找到
head
的位置,同样利用double-word cas
,将<head
,x
>替换为<empty
,head
>。当然如果head
后面是empty
,则出队失败(此时队列是空的)。 - 为了快速找到
head
和tail
的位置,算法使用两个变量记录入队和出队发生的次数,显然这两个变量的改变都是原子递增的。
在某个时刻,队列可能是下面这个样子
我也用cas
操作实现了一个队列,但是没有用论文中的算法。而更偏向于dpdk
的实现。
struct headtail{
volatile uint32_t head;
volatile uint32_t tail;
};
struct queue{
struct headtail prod;
struct headtail cons;
int array[queue_size];
int capacity;
};
int cas_enqueue(struct queue* queue, int val)
{
uint32_t head;
uint32_t idx;
bool succ;
do{
head = queue->prod.head;
if (queue->capacity queue->cons.tail - head < 1)
{
/* queue is full */
return -1;
}
/* move queue->prod.head */
succ = cas(&queue->prod.head, head, head 1);
}while(!succ);
idx = head & queue->capacity;
/* set val */
queue->array[idx] = val;
/* wait */
while(unlikely(queue->prod.tail != head))
{
_mm_pause();
}
queue->prod.tail = head 1;
return 0;
}
int cas_dequeue(struct queue* queue, int* pval)
{
uint32_t head;
uint32_t idx;
bool succ;
do {
head = queue->cons.head;
if (queue->prod.tail - head < 1)
{
/* queue is empty */
return -1;
}
/* forward queue->head */
succ = cas(&queue->cons.head, head, head 1);
}while(!succ);
idx = head & queue->capacity;
*pval = queue->array[idx];
/* wait */
while(unlikely(queue->cons.tail != head))
{
_mm_pause();
}
/* move cons tail */
queue->cons.tail = head 1;
return 0;
}
总结
无论是锁
还是无锁
,其实都是一种多线程环境下的同步方式,锁
的应用更为广泛,而无锁
更有一种自旋的味道在里面,在特定场景下的确能提高性能,比如dpdk
中ring
实际就是无锁队列的应用