- 关于spinlock
我们在知道什么是spinlock之前,还需要知道为什么需要这个spinlock?
spinlock本质就是锁,提到锁,我们就回到了多线程编程的混沌初期,为了实现多线程编程,操作系统引入了锁。通过锁能够保证在多核多线程情况下,对临界区资源进行保护,从而保证操作数据的一致性。
-
锁
那么我们来温习下操作系统中5个知名的锁概念,每个技术都有适合自己的应用场景,此处引入介绍不再进一步深入展开。
-
信号量(semaphore)
linux中的信号量是一种睡眠锁。如有一个任务试图获得一个已被持有的信号量时,信号量会将其推进等待队列,然后让其睡眠。当持有信号量的进程将信号量开释后,在等待队列中的一个任务将被唤醒,从而便可以获得这个信号量。
信号量分为二元信号量和多元信号量,所谓二元信号量就是指该信号量只有两个状态,要么被占用,要么空闲;而多元信号量则允许同时被n个线程占有,超出n个外的占用请求将被阻塞。信号量是“系统级别”的,即同一个信号量可以被不同的进程访问。 -
互斥量 (mutex)
和二元信号量类似,不同的是互斥量的获取和释放必须是在同一个线程中进行的。如果一个线程不能去释放一个并不是它所占有的互斥量。而信号量是可以由其它线程进行释放的。
-
临界区(critical section)
把获取临界区的锁称为进入临界区,而把锁的释放称为离开临界区。spinlock就是为了保护这临界区。
-
读写锁(read-write lock)
如果程序大部分时间都是在读取,使用前面的锁时,每次读也要申请锁的,会导致其他线程就无法再对此段数据进行同步读取。我们知道对数据进行读取时,不存在数据同步问题的,那么这些读锁就影响了程序的性能。读写锁的出现就是为了解决这个问题的。
读写锁,有两种获取方式:共享(shared)或独占 (exclusive)。如果当前读写锁处于空闲状态,那么当多个线程同时以共享方式访问该读写锁时,都可以成功;而如果一个线程以独占的方式访问该读写锁,那么它会等待所有共享访问都结束后才可以成功。在读写锁被独占的过程中,再次共享和独占请求访问该锁,都会进行等待状态。 -
条件变量(condition variable)
条件变量相当于一种通知机制。多个线程可以设置等待该条件变量,一旦另外的线程设置了该条件变量(相当于唤醒条件变量)后,多个等待的线程就可以继续执行了。
以上是操作系统相关的几个概念,信号量也好互斥量也罢,只是不同的手段来实现资源的保护,实际还是根据真实应用需求的来选择。
-
spinlock
我们来看下spinlock, spinlock叫做自旋锁,最初针对smp系统,实现在smp多处理器情况下临界区保护。
主要作用是给临界数据加锁,从而保护临界数据不被同时访问,实现多任务的同步。如果临界数据当前不可访问,那么就自旋直到可以访问为止。
自旋锁和互斥锁存在差异的是自旋锁不会引起调用者睡眠,如果自旋锁无法获取,那么调用者一直循环检测自旋锁直到释放。
spinlock的工作方式本身就体现了它的优缺点,优点是执行速度快,不涉及上下文切换;缺点是耗费cpu资源。
在linux内核中,自旋锁通常用于包含内核数据结构的操作,可以看到在许多内核数据结构中都嵌入有spinlock,这些大部分就是用于保证它自身被操作的原子性(原子操作atomic operation为"不可被中断的一个或一系列操作",最后其实是通过底层硬件来保证的),在操作这样的结构体时都经历这样的过程:上锁-操作-解锁。
因为在现代处理器系统中,考虑到中断、内核抢占以及其他处理器的访问,所以spinlock自旋锁应该阻止在代码运行过程中出现的其他并发干扰。
-
中断
中断会触发中断例程的执行,如果中断例程访问了临界区,这就可能会有大量中断进来不断触发中断例程来进入临界区,那么临界区的原子性就被打破了。所以如果在中断例程中存在访问某个临界区的代码,那么就必须用中断禁用spinlock保护。(不同的中断类型(硬件中断和软件中断)对应于不同版本的自旋锁实现)
-
内核抢占
我们知道linux内核在2.6以后,支持内核抢占。这种情况下进入临界区就需要避免因内核抢占造成的并发,使用禁用抢占(preempt_disable())的spinlock,结束后再开启抢占(preempt_enable())。
-
多处理器的访问
在多个物理处理器系统,肯定会有多个进程的并发访问内存。这样就需要在内存加一个标志,每个需要进入临界区的代码都必须检查这个标志,看是否有进程已经在这个临界区中。当然每个系统都有一套自己的实现方案。
其实在内核代码中针对以上几点都设计了针对的spinlock版本,开发者只要根据不同场景选择不同版本即可。
-
内核代码定义
与spinlock 相关的文件可以查看内核源码中的include/linux文件夹,主要是include/linux/spinlock.h提供spinlock通用的spinlock和rwlock申明。定义了不同的spinlock版本,例如,以下下函数均定义在spinlock.h文件中。
如果在中断例程中不会操作临界区,可以用如下版本
static __always_inline void spin_lock(spinlock_t *lock)
static __always_inline void spin_unlock(spinlock_t *lock)
在软件中断中操作临界区使用如下spinlock版本:
static inline void spin_lock_bh(spinlock_t *lock)
static inline void spin_unlock_bh(spinlock_t *lock)
如果在硬件中断中操作临界区使用如下spinlock版本:
static inline void spin_lock_irq(spinlock_t *lock)
static inline void spin_unlock_irq(spinlock_t *lock)
如果在控制硬件中断的时候需要同时保存中断状态使用如下spinlock版本:
spin_lock_irqsave(lock, flags)
spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)
获得自旋锁和释放自旋锁有好几个版本,对开发同学来说知道什么样的情况下使用什么版本的获得和释放锁的宏是非常必要的。
例如:如果被保护的临界资源只在进程上下文访问和软中断(包括tasklet、timer)中访问,那么对于这种情况,对共享资源的访问必须使用spin_lock_bh和spin_unlock_bh来处理。不过使用spin_lock_irq和spin_unlock_irq以及spin_lock_irqsave和spin_unlock_irqrestore也可以,它们会同时失效本地硬中断,隐式地也失效了软中断。但是使用spin_lock_bh和spin_unlock_bh是最合适的,它比其他两个快。
spin_lock阻止在不同cpu上的执行单元对共享资源的同时访问以及不同进程上下文互相抢占导致的对共享资源的非同步访问,而中断失效和软中断失效却是为了阻止在同一cpu上软中断或中断对共享资源的非同步访问。
此外根据内核配置中config_smp和config_debug_spinlock,会定义不同的函数,如下:
#if defined(config_smp) || defined(config_debug_spinlock)
# include#else
# include#endif -
关于spinlock注意点
a) 因为spin_lock的使用会浪费cpu资源(因为busy_loop),为了尽可能地消除spin_lock的负面影响,使用spin_lock保护临界区代码尽可能精炼,确保能尽早从临界区出来。
b)如果临界区可能包含引起睡眠的代码则不能使用自旋锁,否则可能引起死锁。万一进程在临界区引发睡眠后,那么后面嗷嗷待哺的那些正在spinlock的进程咋办,它们正等着进入临界区呢?等到猴年马月呢,就触发死锁了。
c)此外频繁的检测锁会让流水线上充满读操作引起cpu对流水线的重排,从而进一步影响性能。如果在循环的中加个pause指令,让cpu暂停n个周期,则可以释放cpu的一些计算资源,让同一个核心上的另一个超线程使用,提升性能。针对这块可以翻阅intel的官方材料:
64-ia-32-architectures-optimization-manual.pdf
以上内容介绍了操作系统中的锁的类型、种类,以及spinlock锁的工作机制和注意点。下面我们聚焦在于当系统中出现spinlock高的时候如何找到问题元凶。 -
spinlock问题模拟
在操作系统中模拟spinlock我们借助posix threads。这个是在多核平台上进行并行编程的一套常用的api。pthreads提供了多种锁机制:
(1) mutex(互斥量):pthread_mutex_*** (2) spin lock(自旋锁):pthread_spin_*** (3) condition variable(条件变量):pthread_con_*** (4) read/write lock(读写锁):pthread_rwlock_*** 首先定义一个自旋锁:spinlock_t x;
然后初始化:spin_lock_init(spinlock_t *x); //自旋锁在使用前需要先初始化
使用后销毁它:spin_destroy(&lock);
当然不过不想用这些api,可以自己实现spinlock,然后再调用之也行。
#include
#include
#include#include
#include
int numcount = 0;
pthread_spinlock_t lock;
using namespace std;
void thread_proc()
{
for (int i = 0; i < 100000000; i) {
pthread_spin_lock(&lock);
numcount;
pthread_spin_unlock(&lock);
}
}
int main()
{
pthread_spin_init(&lock, pthread_process_private); //maybe phread_process_private or pthread_process_shared
std::thread t1(thread_proc);
t1.join();
std::cout << "numcount:" << numcount << std::endl;
pthread_spin_destroy(&lock);
return 0;
}
编译 #g -std=c 11 -lpthread spinlock_t.cpp
执行(循环时间可以增大便于监控)之后我们发现:
通过perf top可以看到95.11%是pthread_spin_lock.
这是我们代码中定义的pthread_spin_lock函数,该函数就是在我们使用使用的库libpthread,就样因果关系就对应起来了。
不过为什么在sys中使用率几乎是0%呢?
因为我们在代码中保护的是用户层的数据,并没有切入到内核态。如我们在前面所描述,linux中自旋锁是用于保护内核数据结构的,当到内核态时候不停自锁就会被监控命令累积到sys上了。
知道函数之后,就可以在源码中找到对应的代码位置进行分析了。
此外可以使用pstack和gdb工具。
用pstack可以显示进程的栈跟踪,用来来确定进程挂起的位置。
gdb是gnu开源组织发布的程序调试工具,用来调试 c 和 c 程序。可以使程序开发者在程序运行时观察程序的内部结构和内存的使用情况。 -
spinlock损耗
然后在代码中加上时间戳后,对比测试了一组数据。
线程数量从1个线程计算增加到40个线程,每个线程的工作内容为累加到1000千万,40线程会将结果累积到4亿。计算每次i增加到i 1的平均时间,就可以理解成spinlock的损耗。我们发现时间变化如下,其中横坐标为线程数量,纵坐标为每次加法的时间损耗,单位为us。
我们发现在40线程下每次加法消耗的时间要比1个线程下每次加法消耗高出几十倍来,虽然投入的cpu资源增加了,但是更多的是在spinlock上消耗了。
通过这样一组实验,对spinlock损耗有进一步的认识,并可得出这样一个结论:当一个临界资源被更多的线程共享争用时候,在并发增加时会导致平均每次操作的时间损耗增加。
所以在一个共享资源争用厉害的业务场景,在不优化争用资源的情况下,一直增加负载反会让业务响应性能更差。 -
小结
因为以上问题是基于自身模拟的问题,所以在定位思路上难免有作弊之嫌疑。不过通过了解spinlock,并深入如何使用spinlock之后,对自旋锁本身有了更深刻的认识。后续我们看到spinlock情况的时候可以更加大胆的来找问题原因了。
- 附录spinlock c 实现
#include class spin_lock { private: std::atomic < bool > flag = atomic_var_init (false); public: spin_lock () = default; spin_lock (const spin_lock &) = delete; spin_lock & operator= (const spin_lock) = delete; void lock () { //acquire spin lock bool expected = false; while (!flag.compare_exchange_strong (expected, true)); expected = false; } void unlock () { //release spin lock flag.store (false); } }; int num = 0; spin_lock sm; int main () { for (int i = 0; i < 10000000; i) { sm.lock (); num; sm.unlock (); } return 0; } 编译命令:g -std=c 11 –lpthread **.cpp 附录模拟mutex mutex的使用方法和spinlock大同小异。 #include #include #include#include #include int num = 0; pthread_mutex_t mutex = pthread_mutex_initializer; void thread_proc () { for (int i = 0; i < 1000000; i) { pthread_mutex_lock (&mutex); num; pthread_mutex_unlock (&mutex); } } int main () { std::thread t1 (thread_proc); t1.join (); std::cout << "num:" << num << std::endl; pthread_mutex_destroy (&mutex); //maybe you always foget this return 0; }
编译命令:
g -std=c 11 -lpthread mutex.cpp
<
ol>
#include
#include
#include
#include
#include
int numcount = 0;
pthread_spinlock_t lock;
using namespace std;
int64_t get_current_timestamp()
{
struct timeval now = { 0, 0 };
gettimeofday(&now, null);
return now.tv_sec * 1000 * 1000 now.tv_usec;
}
void thread_proc()
{
for (int i = 0; i < 100000000; i) {
pthread_spin_lock(&lock);
numcount;
pthread_spin_unlock(&lock);
}
}
int main()
{
pthread_spin_init(&lock, pthread_process_private); //maybe phread_process_private or pthread_process_shared
int64_t start = get_current_timestamp();
std::thread t1(thread_proc), t2(thread_proc);
t1.join();
t2.join();
std::cout << "numcount:" << numcount << std::endl;
int64_t end = get_current_timestamp();
std::cout << "cost:" << end - start << std::endl;
pthread_spin_destroy(&lock);
return 0;
}