软中断、tasklet 和工作队列并不是 Linux 内核中始终存在的机制,而是由更早版本的内核中的“下半部”(bottom half)演变而来。下半部的机制实际上包含五种,但 2.6 版本的内核中,下半部和工作队列的函数都隐没了,只剩下了前三者。介绍这三种下半部实现之前,有必要说一下上半部与下半部的区别。上半部指的是中断处理程序,下半部则指的是一些尽管与中断有相关性然而能够延后执行的工作。举个例子:在网络传输中,网卡接管到数据包这个事件不肯定须要马上被解决,适宜用下半部去实现;然而用户敲击键盘这样的事件就必须马上被响应,应该用中断实现。两者的次要区别在于:中断不能被雷同类型的中断打断,而下半部仍然能够被中断打断;中断对于工夫十分敏感,而下半部基本上都是一些能够提早的工作。因为二者的这种区别,所以对于一个工作是放在上半部还是放在下半部去执行,能够参考上面 4 条:
- 如果一个工作对工夫十分敏感,将其放在中断处理程序中执行。
- 如果一个工作和硬件相干,将其放在中断处理程序中执行。
- 如果一个工作要保障不被其余中断(特地是雷同的中断)打断,将其放在中断处理程序中执行。
- 其余所有工作,思考放在下半部去执行。有写内核工作须要延后执行,因而才有的下半部,进而实现了三种实现下半部的办法。这就是本文要探讨的 软中断 、tasklet 和工作队列。
下表能够更直观的看到它们之间的关系。
软中断
软中断作为下半部机制的代表,是随着 SMP(share memory processor)的呈现应运而生的,它也是 tasklet 实现的根底(tasklet 实际上只是在软中断的根底上增加了肯定的机制)。软中断个别是“可提早函数”的总称,有时候也包含了 tasklet(请读者在遇到的时候依据上下文推断是否蕴含 tasklet)。它的呈现就是因为要满足下面所提出的上半部和下半部的区别,使得对工夫不敏感的工作延后执行,而且能够在多个 CPU 上并行执行,使得总的零碎效率能够更高。它的个性包含:
- 产生后并不是马上能够执行,必须要期待内核的调度能力执行。软中断不能被本人打断(即单个 cpu 上软中断不能嵌套执行),只能被硬件中断打断(上半部)。
- 能够并发运行在多个 CPU 上(即便同一类型的也能够)。所以软中断必须设计为可重入的函数(容许多个 CPU 同时操作),因而也须要应用自旋锁来保其数据结构。
相干数据结构
- 软中断描述符 struct softirq_action{void (action)(struct softirq_action );}; 形容每一种类型的软中断,其中 void(*action)是软中断触发时的执行函数。
- 软中断全局数据和类型
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
enum
{
HI_SOFTIRQ=0, /* 用于高优先级的 tasklet*/
TIMER_SOFTIRQ, /* 用于定时器的下半部 */
NET_TX_SOFTIRQ, /* 用于网络层发包 */
NET_RX_SOFTIRQ, /* 用于网络层收报 */
BLOCK_SOFTIRQ,
BLOCK_IOPOLL_SOFTIRQ,
TASKLET_SOFTIRQ, /* 用于低优先级的 tasklet*/
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */
NR_SOFTIRQS
};
相干 API
- 注册软中断
void open_softirq(int nr, void (*action)(struct softirq_action *))
即注册对应类型的处理函数到全局数组 softirq_vec 中。例如网络发包对应类型为 NET_TX_SOFTIRQ 的处理函数 net_tx_action.
- 触发软中断
void raise_softirq(unsigned int nr)
实际上即以软中断类型 nr 作为偏移量置位每 cpu 变量 irq_stat[cpu_id]的成员变量__softirq_pending,这也是同一类型软中断能够在多个 cpu 上并行运行的根本原因。
- 软中断执行函数
do_softirq-->__do_softirq
执行软中断处理函数__do_softirq 前首先要满足两个条件: (1)不在中断中 (硬中断、软中断和 NMI)。1 (2) 有软中断处于 pending 状态。零碎这么设计是为了防止软件中断在中断嵌套中被调用,并且达到在单个 CPU 上软件中断不能被重入的目标。对于 ARM 架构的 CPU 不存在中断嵌套中调用软件中断的问题,因为 ARM 架构的 CPU 在解决硬件中断的过程中是敞开掉中断的。只有在进入了软中断处理过程中之后才会开启硬件中断,如果在软件中断处理过程中有硬件中断嵌套,也不会再次调用软中断,because 硬件中断是软件中断处理过程中再次进入的,此时 preempt_count 曾经记录了软件中断!对于其它架构的 CPU,有可能在触发调用软件中断前,也就是还在解决硬件中断的时候,就曾经开启了硬件中断,可能会产生中断嵌套,在中断嵌套中是不容许调用软件中断解决的。Why?我的了解是,在产生中断嵌套的时候,表明这个时候是零碎突发忙碌的时候,内核第一要务就是连忙把中断中的事件解决实现,退出中断嵌套。防止屡次嵌套,哪里有工夫处理软件中断,所以把软件中断推延到了所有中断解决实现的时候能力触发软件中断。
须要 C /C++ Linux 服务器架构师学习材料加群 563998835(材料包含 C /C++,Linux,golang 技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),收费分享
实现原理和实例
软中断的调度机会:
- do_irq 实现 I / O 中断时调用 irq_exit。
- 零碎应用 I /O APIC, 在解决完本地时钟中断时。
- local_bh_enable,即开启本地软中断时。
- SMP 零碎中,cpu 解决完被 CALL_FUNCTION_VECTOR 处理器间中断所触发的函数时。
- ksoftirqd/ n 线程被唤醒时。上面以从中断解决返回函数 irq_exit 中调用软中断为例具体阐明。触发和初始化的的流程如图所示:
软中断解决流程
asmlinkage void __do_softirq(void)
{
struct softirq_action *h;
__u32 pending;
int max_restart = MAX_SOFTIRQ_RESTART;
int cpu;
pending = local_softirq_pending();
account_system_vtime(current);
__local_bh_disable((unsigned long)__builtin_return_address(0));
lockdep_softirq_enter();
cpu = smp_processor_id();
restart:
/* Reset the pending bitmask before enabling irqs */
set_softirq_pending(0);
local_irq_enable();
h = softirq_vec;
do {if (pending & 1) {int prev_count = preempt_count();
kstat_incr_softirqs_this_cpu(h - softirq_vec);
trace_softirq_entry(h, softirq_vec);
h->action(h);
trace_softirq_exit(h, softirq_vec);
if (unlikely(prev_count != preempt_count())) {
printk(KERN_ERR "huh, entered softirq %td %s %p"
"with preempt_count %08x,"
"exited with %08x?n", h - softirq_vec,
softirq_to_name[h - softirq_vec],
h->action, prev_count, preempt_count());
preempt_count() = prev_count;}
rcu_bh_qs(cpu);
}
h++;
pending >>= 1;
} while (pending);
local_irq_disable();
pending = local_softirq_pending();
if (pending && --max_restart)
goto restart;
if (pending)
wakeup_softirqd();
lockdep_softirq_exit();
account_system_vtime(current);
_local_bh_enable();}
- 首先调用 local_softirq_pending 函数获得目前有哪些位存在软件中断。
- 调用__local_bh_disable 敞开软中断,其实就是设置正在处理软件中断标记,在同一个 CPU 上使得不能重入__do_softirq 函数。
- 从新设置软中断标记为 0,set_softirq_pending 从新设置软中断标记为 0,这样在之后从新开启中断之后硬件中断中又能够设置软件中断位。
- 调用 local_irq_enable,开启硬件中断。
- 之后在一个循环中,遍历 pending 标记的每一位,如果这一位设置就会调用软件中断的处理函数。在这个过程中硬件中断是开启的,随时能够打断软件中断。这样保障硬件中断不会失落。
- 之后敞开硬件中断 (local_irq_disable),查看是否又有软件中断处于 pending 状态,如果是,并且在本次调用__do_softirq 函数过程中没有累计反复进入软件中断解决的次数超过 max_restart=10 次,就能够从新调用软件中断解决。如果超过了 10 次,就调用 wakeup_softirqd() 唤醒内核的一个过程来处理软件中断。设立 10 次的限度,也是为了防止影响零碎响应工夫。
- 调用_local_bh_enable 开启软中断。
软中断内核线程
之前咱们剖析的触发软件中断的地位其实是中断上下文中,而在软中断的内核线程中理论曾经是过程的上下文。这里说的软中断上下文指的就是零碎为每个 CPU 建设的 ksoftirqd 过程。软中断的内核过程中次要有两个大循环,外层的循环解决有软件中断就解决,没有软件中断就休眠。内层的循环处理软件中断,每循环一次都试探一次是否过长时间占据了 CPU,须要调度就开释 CPU 给其它过程。具体的操作在正文中做了解释。
set_current_state(TASK_INTERRUPTIBLE);
// 外层大循环。while (!kthread_should_stop()) {preempt_disable();// 禁止内核抢占,本人把握 cpu
if (!local_softirq_pending()) {preempt_enable_no_resched();
// 如果没有软中断在 pending 中就让出 cpu
schedule();
// 调度之后从新把握 cpu
preempt_disable();}
__set_current_state(TASK_RUNNING);
while (local_softirq_pending()) {
/* Preempt disable stops cpu going offline.
If already offline, we'll be on wrong CPU:
don't process */
if (cpu_is_offline((long)__bind_cpu))
goto wait_to_die;
// 有软中断则开始软中断调度
do_softirq();
// 查看是否须要调度,防止始终占用 cpu
preempt_enable_no_resched();
cond_resched();
preempt_disable();
rcu_sched_qs((long)__bind_cpu);
}
preempt_enable();
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
wait_to_die:
preempt_enable();
/* Wait for kthread_stop */
set_current_state(TASK_INTERRUPTIBLE);
while (!kthread_should_stop()) {schedule();
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
tasklet
因为软中断必须应用可重入函数,这就导致设计上的复杂度变高,作为设施驱动程序的开发者来说,减少了累赘。而如果某种利用并不需要在多个 CPU 上并行执行,那么软中断其实是没有必要的。因而诞生了补救以上两个要求的 tasklet。它具备以下个性:a)一种特定类型的 tasklet 只能运行在一个 CPU 上,不能并行,只能串行执行。b)多个不同类型的 tasklet 能够并行在多个 CPU 上。c)软中断是动态调配的,在内核编译好之后,就不能扭转。但 tasklet 就灵便许多,能够在运行时扭转(比方增加模块时)。tasklet 是在两种软中断类型的根底上实现的,因而如果不须要软中断的并行个性,tasklet 就是最好的抉择。也就是说 tasklet 是软中断的一种非凡用法,即 提早状况下的串行执行。
相干数据结构
- tasklet 描述符
struct tasklet_struct
{
struct tasklet_struct *next;// 将多个 tasklet 链接成单向循环链表
unsigned long state;//TASKLET_STATE_SCHED(Tasklet is scheduled for execution) TASKLET_STATE_RUN(Tasklet is running (SMP only))
atomic_t count;//0: 激活 tasklet 非 0: 禁用 tasklet
void (*func)(unsigned long); // 用户自定义函数
unsigned long data; // 函数入参
};
- tasklet 链表
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);// 低优先级
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);// 高优先级
相干 API
- 定义 tasklet
#define DECLARE_TASKLET(name, func, data)
struct tasklet_struct name = {NULL, 0, ATOMIC_INIT(0), func, data }
// 定义名字为 name 的非激活 tasklet
#define DECLARE_TASKLET_DISABLED(name, func, data)
struct tasklet_struct name = {NULL, 0, ATOMIC_INIT(1), func, data }
// 定义名字为 name 的激活 tasklet
void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data)
// 动静初始化 tasklet
- tasklet 操作
static inline void tasklet_disable(struct tasklet_struct *t)
// 函数临时禁止给定的 tasklet 被 tasklet_schedule 调度,直到这个 tasklet 被再次被 enable;若这个 tasklet 以后在运行, 这个函数忙期待直到这个 tasklet 退出
static inline void tasklet_enable(struct tasklet_struct *t)
// 使能一个之前被 disable 的 tasklet;若这个 tasklet 曾经被调度, 它会很快运行。tasklet_enable 和 tasklet_disable 必须匹配调用, 因为内核跟踪每个 tasklet 的 "禁止次数"
static inline void tasklet_schedule(struct tasklet_struct *t)
// 调度 tasklet 执行,如果 tasklet 在运行中被调度, 它在实现后会再次运行; 这保障了在其余事件被解决当中产生的事件受到应有的留神. 这个做法也容许一个 tasklet 从新调度它本人
tasklet_hi_schedule(struct tasklet_struct *t)
// 和 tasklet_schedule 相似,只是在更高优先级执行。当软中断解决运行时, 它解决高优先级 tasklet 在其余软中断之前,只有具备低响应周期要求的驱动才应应用这个函数, 可防止其他软件中断解决引入的附加周期.
tasklet_kill(struct tasklet_struct *t)
// 确保了 tasklet 不会被再次调度来运行,通常当一个设施正被敞开或者模块卸载时被调用。如果 tasklet 正在运行, 这个函数期待直到它执行结束。若 tasklet 从新调度它本人,则必须阻止在调用 tasklet_kill 前它从新调度它本人,如同应用 del_timer_sync
实现原理
- 调度原理
static inline void tasklet_schedule(struct tasklet_struct *t)
{if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
void __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;
local_irq_save(flags);
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);// 退出低优先级列表
raise_softirq_irqoff(TASKLET_SOFTIRQ);// 触发软中断
local_irq_restore(flags);
}
- tasklet 执行过程 TASKLET_SOFTIRQ 对应执行函数为 tasklet_action,HI_SOFTIRQ 为 tasklet_hi_action,以 tasklet_action 为例阐明,tasklet_hi_action 大同小异。
static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;
local_irq_disable();
list = __get_cpu_var(tasklet_vec).head;
__get_cpu_var(tasklet_vec).head = NULL;
__get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;// 获得 tasklet 链表
local_irq_enable();
while (list) {
struct tasklet_struct *t = list;
list = list->next;
if (tasklet_trylock(t)) {if (!atomic_read(&t->count)) {
// 执行 tasklet
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
// 如果 t ->count 的值不等于 0,阐明这个 tasklet 在调度之后,被 disable 掉了,所以会将 tasklet 构造体从新放回到 tasklet_vec 链表,并从新调度 TASKLET_SOFTIRQ 软中断,在之后 enable 这个 tasklet 之后从新再执行它
local_irq_disable();
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();}
}
工作队列
从下面的介绍看以看出,软中断运行在中断上下文中,因而不能阻塞和睡眠,而 tasklet 应用软中断实现,当然也不能阻塞和睡眠。但如果某提早处理函数须要睡眠或者阻塞呢?没关系工作队列就能够如您所愿了。把推后执行的工作叫做工作(work),形容它的数据结构为 work_struct,这些工作以队列构造组织成工作队列(workqueue),其数据结构为 workqueue_struct,而工作线程就是负责执行工作队列中的工作。零碎默认的工作者线程为 events。工作队列 (work queue) 是另外一种将工作推后执行的模式。工作队列能够把工作推后,交由一个内核线程去执行—这个下半局部总是会在过程上下文执行,但因为是内核线程,其不能拜访用户空间。最重要特点的就是 工作队列容许从新调度甚至是睡眠。通常,在工作队列和软中断 /tasklet 中作出抉择非常容易。可应用以下规定:– 如果推后执行的工作须要睡眠,那么只能抉择工作队列。– 如果推后执行的工作须要延时指定的工夫再触发,那么应用工作队列,因为其能够利用 timer 延时(内核定时器实现)。– 如果推后执行的工作须要在一个 tick 之内解决,则应用软中断或 tasklet,因为其能够抢占一般过程和内核线程,同时不可睡眠。– 如果推后执行的工作对提早的工夫没有任何要求,则应用工作队列,此时通常为无关紧要的工作。实际上,工作队列的实质就是将工作交给内核线程解决,因而其能够用内核线程替换。然而内核线程的创立和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以咱们也举荐应用工作队列。
相干数据结构
- 失常工作构造体
struct work_struct {
atomic_long_t data; // 传递给工作函数的参数
#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
struct list_head entry; // 链表构造,链接同一工作队列上的工作。work_func_t func; // 工作函数,用户自定义实现
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
// 工作队列执行函数的原型:void (*work_func_t)(struct work_struct *work);
// 该函数会由一个工作者线程执行,因而其在过程上下文中,能够睡眠也能够中断。但只能在内核中运行,无法访问用户空间。
- 提早工作构造体(提早的实现是在调度时提早插入相应的工作队列)
struct delayed_work {
struct work_struct work;
struct timer_list timer; // 定时器,用于实现提早解决
};
- 工作队列构造体
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq; // 指针数组,其每个元素为 per-cpu 的工作队列
struct list_head list;
const char *name;
int singlethread; // 标记是否只创立一个工作者线程
int freezeable; /* Freeze threads during suspend */
int rt;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
- 每 cpu 工作队列(每 cpu 都对应一个工作者线程 worker_thread)
struct cpu_workqueue_struct {
spinlock_t lock;
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work;
struct workqueue_struct *wq;
struct task_struct *thread;
} ____cacheline_aligned;
相干 API
- 缺省工作队列
动态创立
DECLARE_WORK(name,function); // 定义失常执行的工作项
DECLARE_DELAYED_WORK(name,function);// 定义延后执行的工作项
动态创建
INIT_WORK(_work, _func) // 创立失常执行的工作项
INIT_DELAYED_WORK(_work, _func)// 创立延后执行的工作项
调度默认工作队列
int schedule_work(struct work_struct *work)// 对失常执行的工作进行调度,即把给定工作的处理函数提交给缺省的工作队列和工作者线程。工作者线程实质上是一个一般的内核线程,在默认状况下,每个 CPU 均有一个类型为“events”的工作者线程,当调用 schedule_work 时,这个工作者线程会被唤醒去执行工作链表上的所有工作。零碎默认的工作队列名称是:keventd_wq, 默认的工作者线程叫:events/n,这里的 n 是处理器的编号, 每个处理器对应一个线程。比方,单处理器的零碎只有 events/ 0 这样一个线程。而双处理器的零碎就会多一个 events/ 1 线程。默认的工作队列和工作者线程由内核初始化时创立:start_kernel()-->rest_init-->do_basic_setup-->init_workqueues
调度提早工作
int schedule_delayed_work(struct delayed_work *dwork,unsigned long delay)
刷新缺省工作队列
void flush_scheduled_work(void)
// 此函数会始终期待,直到队列中的所有工作都被执行。勾销提早工作
static inline int cancel_delayed_work(struct delayed_work *work)
//flush_scheduled_work 并不勾销任何提早执行的工作,因而,如果要勾销提早工作,应该调用 cancel_delayed_work。
以上均是采纳缺省工作者线程来实现工作队列,其长处是简略易用,毛病是如果缺省工作队列负载太重,执行效率会很低,这就须要咱们创立本人的工作者线程和工作队列。
- 自定义工作队列
create_workqueue(name)
// 宏定义 返回值为工作队列,name 为工作线程名称。创立新的工作队列和相应的工作者线程,name 用于该内核线程的命名。int queue_work(struct workqueue_struct *wq, struct work_struct *work)
// 相似于 schedule_work,区别在于 queue_work 把给定工作提交给创立的工作队列 wq 而不是缺省队列。int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay)
// 调度提早工作。void flush_workqueue(struct workqueue_struct *wq)
// 刷新指定工作队列。void destroy_workqueue(struct workqueue_struct *wq)
// 开释创立的工作队列。
实现原理
- 工作队列的组织构造 即 workqueue_struct、cpu_workqueue_struct 与 work_struct 的关系。一个工作队列对应一个 work_queue_struct,工作队列中每 cpu 的工作队列由 cpu_workqueue_struct 示意,而 work_struct 为其上的具体工作。关系如下图所示:
2. 工作队列的工作过程
- 利用实例 linux 各个接口的状态 (up/down) 的音讯须要告诉 netdev_chain 上感兴趣的模块同时上报用户空间音讯。这里应用的就是工作队列。具体流程图如下所示:
- 是否处于中断中在 Linux 中是通过 preempt_count 来判断的, 具体如下:在 linux 零碎的过程数据结构里,有这么一个数据结构: #define preempt_count() (current_thread_info()->preempt_count) 利用 preempt_count 能够示意是否处于中断解决或者软件中断处理过程中, 如下所示:# define hardirq_count() (preempt_count() & HARDIRQ_MASK) #define softirq_count() (preempt_count() & SOFTIRQ_MASK) #define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK | NMI_MASK)) #define in_irq() (hardirq_count()) #define in_softirq() (softirq_count()) #define in_interrupt() (irq_count())
preempt_count 的 8~23 位记录中断解决和软件中断处理过程的计数。如果有计数,示意零碎在硬件中断或者软件中断处理过程中。