乐趣区

面试必备进程同步机制内核自旋锁

进程 (线程) 间的同步机制是面试时的常见问题,所以准备用一个系列来好好整理下用户态与内核态的各种同步机制。本文就以内核空间的一种基础同步机制 —自旋锁 开始好了

自旋锁是什么

自旋锁就是一个二状态的原子 (atomic) 变量:

  • unlocked
  • locked

当任务 A 希望访问被自旋锁保护的临界区 (Critical Section),它首先需要这个自旋锁当前处于unlocked 状态,然后它会去尝试获取 (acquire) 这个自旋锁(将这个变量状态修改为locked),

如果在这之后有另一个任务 B 同样希望去访问这段这段临界区,那么它必须要等到任务 A 释放 (release) 掉自旋锁才行,在这之前,任务 B 会一直等待此处,不段尝试获取 (acquire),也就是我们说的 自旋 在这里。

自旋锁有什么特点

如果被问到这个问题,不少人可能根据上面的定义也能总结出来了:

  • “ 保护临界区 ”
  • “ 一直忙等待,直到锁被其他人释放 ”
  • “ 适合用在等待时间很短的场景中 ”

说错了吗?当然没有!并且这些的确都是自旋锁的特点,那么更多呢?

几个基本概念

为什么内核需要引入自旋锁?回答这个问题之前我想先简单引入以下几个基本概念:

UP & SMP

UP表示单处理器,SMP表示对称多处理器(多CPU)。一个处理器就视为一个执行单元,在任何一个时刻,只能运行在一个进程上下文或者中断上下文里。

中断(interrupt)

中断可以发生在任务的指令过程中,如果中断处于使能,会从任务所处的进程上下文切换到中断上下文,在中断上下文中进行所谓的中断处理(ISR)。

内核中使用 local_irq_disable()或者 local_irq_save(&flags) 来去使能中断。两者的区别是后者会将当前的中断使能状态先保存到 flags 中。

相反,内核使用 local_irq_enale() 来无条件的使能中断,而使用 local_irq_restore(&flags) 来恢复之前的中断状态。

无论是开中断还是关中断的函数都有 local 前缀, 这表示开关中断的只在当前 CPU 生效。

内核态抢占(preempt)

抢占 ,通俗的理解就是内核调度时, 高优先级的任务从低优先的任务中抢到CPU 的控制权,开始运行,其中又分为 用户态抢占 内核态抢占 , 本文需要关心的是 内核态抢占

早期版本 (比2.6 更早的)的内核还是非抢占式内核,也就是说当高优先级任务就绪时,除非低优先级任务主动放弃 CPU(比如阻塞或者主动调用 Schedule 触发调度),否则高优先级任务是没有机会运行的。

而在此之后,内核可配置为抢占式内核 (默认),在一些时机(比如说中断处理结束,返回内核空间时),会触发重新调度,此时高优先级的任务可以抢占原来占用CPU 的低优先级任务。

需要特别指出的是,抢占同样需要中断处于打开状态!

void __sched notrace preempt_schedule(void)
{struct thread_info *ti = current_thread_info();

    /*
     * If there is a non-zero preempt_count or interrupts are disabled,
     * we do not want to preempt the current task. Just return..
     */
    if (likely(ti->preempt_count || irqs_disabled()))
        return;

上面代码中的 preempt_count表示当前任务是否可被抢占,0表示可以被抢占,而大于 0 表示不可以。而 irqs_disabled 用来看中断是否关闭。

内核中使用 preemt_disbale() 来禁止抢占,使用 preempt_enable() 来使能可抢占。

单处理器上临界区问题

对于单处理器来说,由于任何一个时刻只会有一个执行单元,因此 不存在 多个执行单元同时访问临界区的情况。但是依然存在下面的情形需要保护

Case 1 任务上下文抢占

低优先级任务 A 进入临界区,但此时发生了调度 (比如发生了中断, 然后从中断中返回), 高优先级任务B 开始运行访问临界区。


解决方案:进入临界区前禁止抢占就好了。这样即使发生了中断,中断返回也只能回到任务A.

Case 2 中断上下文抢占

任务 A 进入临界区,此时发生了中断,中断处理函数中也去访问修改临界区。当中断处理结束时,返回任务 A 的上下文,但此时临界区已经变了!

解决方案:进入临界区前禁止中断(顺便说一句,这样也顺便禁止了抢占)

Case 3 多处理器上临界区问题

除了单处理器上的问题之外,多处理上还会面临一种需要保护的情形

其他 CPU 访问

任务 A 运行在 CPU_a 上,进入临界区前关闭了中断 (本地),而此时运行在CPU_b 上的任务 B 还是可以进入临界区!没有人能限制它

解决方案:任务 A 进入临界区前持有一个互斥结构,阻止其他 CPU 上的任务进入临界区,直到任务 A 退出临界区,释放互斥结构。

这个互斥结构就是自旋锁的来历。所以本质上,自旋锁就是为了针对 SMP 体系下的同时访问临界区而发明的!

内核中的自旋锁实现

接下来,我们来看一下内核中的自旋锁是如何实现的,我的内核版本是4.4.0

定义

内核使用 spinlock 结构表示一个自旋锁,如果不开调试信息的话,这个结构就是一个·raw_spinlock·:

typedef struct spinlock {
    union {
        struct raw_spinlock rlock;
        // code omitted
    };
} spinlock_t;

raw_spinlock 这个结构展开, 可以看到这是一个体系相关的 arch_spinlock_t 结构

typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
    // code omitted
} raw_spinlock_t;

本文只关心常见的 x86_64 体系来说,这种情况下上述结构可展开为

typedef struct qspinlock {atomic_t    val;} arch_spinlock_t;

上面的结构是 SMP 上的定义,对于 UParch_spinlock_t 就是一个空结构

typedef struct {} arch_spinlock_t;

啊,自旋锁就是一个原子变量 (修改这个变量会LOCK 总线,因此可以避免多个 CPU 同时对其进行修改)

API

内核使用 spin_lock_init 来进行自旋锁的初始化

# define raw_spin_lock_init(lock)                \
    do {*(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)
    
#define spin_lock_init(_lock)                \
do {                            \
    spinlock_check(_lock);                \
    raw_spin_lock_init(&(_lock)->rlock);        \
} while (0)

最终 val 会设置为0 (对于UP,不存在这个赋值)

内核使用 spin_lockspin_lock_irq 或者 spin_lock_irqsave 完成加锁操作;使用 spin_unlockspin_unlock_irq 或者 spin_unlock_irqsave 完成对应的解锁。

spin_lock / spin_unlock

static inline void spin_lock(spinlock_t *lock)
{raw_spin_lock(&lock->rlock);
}

对于 UP,raw_spin_lock 最后会展开为_LOCK

# define __acquire(x) (void)0

#define __LOCK(lock) \
  do {preempt_disable(); __acquire(lock); (void)(lock); } while (0)

可以看到,它就是单纯地禁止抢占。这是上面 Case 1 的解决办法

而对于 SMP, raw_spin_lock 会展开为

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

这里同样会禁止抢占,然后由于 spin_acquire 在没设置 CONFIG_DEBUG_LOCK_ALLOC 时是空操作, 所以关键的语句是最后一句,将其展开后是

#define LOCK_CONTENDED(_lock, try, lock) \
    lock(_lock)

所以,真正生效的是

static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{__acquire(lock);
    arch_spin_lock(&lock->raw_lock);
}

__acquire并不重要。而 arch_spin_lock 定义在 include/asm-generic/qspinlock.h. 这里会检查val,如果当前锁没有被持有(值为0), 那么就通过原子操作将其修改为1 并返回。

否则就调用 queued_spin_lock_slowpath 一直自旋。

#define arch_spin_lock(l)        queued_spin_lock(l)

static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
    u32 val;

    val = atomic_cmpxchg(&lock->val, 0, _Q_LOCKED_VAL);
    if (likely(val == 0))
        return;
    queued_spin_lock_slowpath(lock, val);
}

以上就是 spin_lock() 的实现过程,可以发现除了我们熟知的等待自旋操作之外,它会在之前先调用 preempt_disable 禁止抢占,不过它并没有禁止中断,也就是说,它可以解决前面说的 Case 1Case 3

Case 2 还是有问题!

使用这种自旋锁加锁方式时,如果本地 CPU 发生了中断,在中断上下文中也去获取该自旋锁,这就会导致 死锁

因此,使用 spin_lock()需要保证知道该锁不会在该 CPU 的中断中使用 (其他CPU 的中断没问题)

解锁时成对使用的 spin_unlock 基本就是加锁的逆向操作,在设置了 val 重新为 0 之后,使能抢占。

static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{spin_release(&lock->dep_map, 1, _RET_IP_);
    do_raw_spin_unlock(lock);
    preempt_enable();}

spin_lock_irq / spin_unlock_irq

这里我们就只关注 SMP 的情形了,相比之前的 spin_lock 中调用__raw_spin_lock, 这里多出的一个操作的就是禁止中断。

static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{local_irq_disable();   // 多了一个中断关闭
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

前面说过,实际禁止中断的时候也就不会发生抢占了,那么这里其实使用 preemt_disable 禁止抢占是个有点多余的动作。

关于这个问题,可以看以下几个连接的讨论
CU 上的讨论
Stackoverflow 上的回答
linux DOC

对于的解锁操作是 spin_unlock_irq 会调用__raw_spin_unlock_irq。相比前一种实现方式,多了一个local_irq_enable

static inline void __raw_spin_unlock_irq(raw_spinlock_t *lock)
{spin_release(&lock->dep_map, 1, _RET_IP_);
    do_raw_spin_unlock(lock);
    local_irq_enable();
    preempt_enable();}

这种方式也就解决了Case 2

spin_lock_irqsave / spin_unlock_irqsave

spin_lock_irq还有什么遗漏吗?它没有遗漏,但它最后使用 local_irq_enable 打开了中断,如果进入临界区前中断本来是关闭,那么通过这一进一出,中断竟然变成打开的了!这显然不合适!

因此就有了 spin_lock_irqsave 和对应的spin_unlock_irqsave. 它与上一种的区别就在于加锁时将中断使能状态保存在了flags

static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock)
{
    unsigned long flags;

    local_irq_save(flags);   // 保存中断状态到 flags
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    do_raw_spin_lock_flags(lock, &flags);
    
    return flags;
}

而在对应的解锁调用时,中断状态进行了恢复,这样就保证了在进出临界区前后,中断使能状态是不变的。

static inline void __raw_spin_unlock_irqrestore(raw_spinlock_t *lock,
                        unsigned long flags)
{spin_release(&lock->dep_map, 1, _RET_IP_);
    do_raw_spin_unlock(lock);
    local_irq_restore(flags);   // 从 flags 恢复
    preempt_enable();}

总结

  • 内核自旋锁的主要用于 SMP 系统上的临界区保护,并且在 UP 系统上也有简化的实现
  • 内核自旋锁与 抢占 中断 的关系密切
  • 内核自旋锁在内核有多个API, 实际使用时可以灵活使用。
退出移动版