关于linux:Linux内核中的软中断tasklet和工作队列详解

软中断、tasklet和工作队列并不是Linux内核中始终存在的机制,而是由更早版本的内核中的“下半部”(bottom half)演变而来。下半部的机制实际上包含五种,但2.6版本的内核中,下半部和工作队列的函数都隐没了,只剩下了前三者。 介绍这三种下半部实现之前,有必要说一下上半部与下半部的区别。 上半部指的是中断处理程序,下半部则指的是一些尽管与中断有相关性然而能够延后执行的工作。举个例子:在网络传输中,网卡接管到数据包这个事件不肯定须要马上被解决,适宜用下半部去实现;然而用户敲击键盘这样的事件就必须马上被响应,应该用中断实现。 两者的次要区别在于:中断不能被雷同类型的中断打断,而下半部仍然能够被中断打断;中断对于工夫十分敏感,而下半部基本上都是一些能够提早的工作。因为二者的这种区别,所以对于一个工作是放在上半部还是放在下半部去执行,能够参考上面4条:

  1. 如果一个工作对工夫十分敏感,将其放在中断处理程序中执行。
  2. 如果一个工作和硬件相干,将其放在中断处理程序中执行。
  3. 如果一个工作要保障不被其余中断(特地是雷同的中断)打断,将其放在中断处理程序中执行。
  4. 其余所有工作,思考放在下半部去执行。 有写内核工作须要延后执行,因而才有的下半部,进而实现了三种实现下半部的办法。这就是本文要探讨的软中断tasklet工作队列

下表能够更直观的看到它们之间的关系。

软中断

软中断作为下半部机制的代表,是随着SMP(share memory processor)的呈现应运而生的,它也是tasklet实现的根底(tasklet实际上只是在软中断的根底上增加了肯定的机制)。软中断个别是“可提早函数”的总称,有时候也包含了tasklet(请读者在遇到的时候依据上下文推断是否蕴含tasklet)。它的呈现就是因为要满足下面所提出的上半部和下半部的区别,使得对工夫不敏感的工作延后执行,而且能够在多个CPU上并行执行,使得总的零碎效率能够更高。它的个性包含:

  • 产生后并不是马上能够执行,必须要期待内核的调度能力执行。软中断不能被本人打断(即单个cpu上软中断不能嵌套执行),只能被硬件中断打断(上半部)。
  • 能够并发运行在多个CPU上(即便同一类型的也能够)。所以软中断必须设计为可重入的函数(容许多个CPU同时操作),因而也须要应用自旋锁来保其数据结构。

相干数据结构

  • 软中断描述符 struct softirq_action{ void (action)(struct softirq_action );}; 形容每一种类型的软中断,其中void(*action)是软中断触发时的执行函数。
  • 软中断全局数据和类型
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;  
    enum  
    {  
       HI_SOFTIRQ=0, /*用于高优先级的tasklet*/  
       TIMER_SOFTIRQ, /*用于定时器的下半部*/  
       NET_TX_SOFTIRQ, /*用于网络层发包*/  
       NET_RX_SOFTIRQ, /*用于网络层收报*/  
       BLOCK_SOFTIRQ,  
       BLOCK_IOPOLL_SOFTIRQ,  
       TASKLET_SOFTIRQ, /*用于低优先级的tasklet*/  
       SCHED_SOFTIRQ,  
       HRTIMER_SOFTIRQ,  
       RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */  
       NR_SOFTIRQS  
   };

相干API

  • 注册软中断
void open_softirq(int nr, void (*action)(struct softirq_action *))

即注册对应类型的处理函数到全局数组softirq_vec中。例如网络发包对应类型为NET_TX_SOFTIRQ的处理函数net_tx_action.

  • 触发软中断
void raise_softirq(unsigned int nr)

实际上即以软中断类型nr作为偏移量置位每cpu变量irq_stat[cpu_id]的成员变量__softirq_pending,这也是同一类型软中断能够在多个cpu上并行运行的根本原因。

  • 软中断执行函数
do_softirq-->__do_softirq

执行软中断处理函数__do_softirq前首先要满足两个条件: (1)不在中断中(硬中断、软中断和NMI) 。1 (2)有软中断处于pending状态。 零碎这么设计是为了防止软件中断在中断嵌套中被调用,并且达到在单个CPU上软件中断不能被重入的目标。对于ARM架构的CPU不存在中断嵌套中调用软件中断的问题,因为ARM架构的CPU在解决硬件中断的过程中是敞开掉中断的。只有在进入了软中断处理过程中之后才会开启硬件中断,如果在软件中断处理过程中有硬件中断嵌套,也不会再次调用软中断,because硬件中断是软件中断处理过程中再次进入的,此时preempt_count曾经记录了软件中断!对于其它架构的CPU,有可能在触发调用软件中断前,也就是还在解决硬件中断的时候,就曾经开启了硬件中断,可能会产生中断嵌套,在中断嵌套中是不容许调用软件中断解决的。Why?我的了解是,在产生中断嵌套的时候,表明这个时候是零碎突发忙碌的时候,内核第一要务就是连忙把中断中的事件解决实现,退出中断嵌套。防止屡次嵌套,哪里有工夫处理软件中断,所以把软件中断推延到了所有中断解决实现的时候能力触发软件中断。

须要C/C++ Linux服务器架构师学习材料加群563998835(材料包含C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),收费分享

实现原理和实例

软中断的调度机会:

  1. do_irq实现I/O中断时调用irq_exit。
  2. 零碎应用I/O APIC,在解决完本地时钟中断时。
  3. local_bh_enable,即开启本地软中断时。
  4. SMP零碎中,cpu解决完被CALL_FUNCTION_VECTOR处理器间中断所触发的函数时。
  5. ksoftirqd/n线程被唤醒时。 上面以从中断解决返回函数irq_exit中调用软中断为例具体阐明。 触发和初始化的的流程如图所示:

软中断解决流程

asmlinkage void __do_softirq(void)
{
    struct softirq_action *h;
    __u32 pending;
    int max_restart = MAX_SOFTIRQ_RESTART;
    int cpu;

    pending = local_softirq_pending();
    account_system_vtime(current);

    __local_bh_disable((unsigned long)__builtin_return_address(0));
    lockdep_softirq_enter();

    cpu = smp_processor_id();
restart:
    /* Reset the pending bitmask before enabling irqs */
    set_softirq_pending(0);

    local_irq_enable();

    h = softirq_vec;

    do {
        if (pending & 1) {
            int prev_count = preempt_count();
            kstat_incr_softirqs_this_cpu(h - softirq_vec);

            trace_softirq_entry(h, softirq_vec);
            h->action(h);
            trace_softirq_exit(h, softirq_vec);
            if (unlikely(prev_count != preempt_count())) {
                printk(KERN_ERR "huh, entered softirq %td %s %p"
                       "with preempt_count %08x,"
                       " exited with %08x?n", h - softirq_vec,
                       softirq_to_name[h - softirq_vec],
                       h->action, prev_count, preempt_count());
                preempt_count() = prev_count;
            }

            rcu_bh_qs(cpu);
        }
        h++;
        pending >>= 1;
    } while (pending);

    local_irq_disable();

    pending = local_softirq_pending();
    if (pending && --max_restart)
        goto restart;

    if (pending)
        wakeup_softirqd();

    lockdep_softirq_exit();

    account_system_vtime(current);
    _local_bh_enable();
}
  1. 首先调用local_softirq_pending函数获得目前有哪些位存在软件中断。
  2. 调用__local_bh_disable敞开软中断,其实就是设置正在处理软件中断标记,在同一个CPU上使得不能重入__do_softirq函数。
  3. 从新设置软中断标记为0,set_softirq_pending从新设置软中断标记为0,这样在之后从新开启中断之后硬件中断中又能够设置软件中断位。
  4. 调用local_irq_enable,开启硬件中断。
  5. 之后在一个循环中,遍历pending标记的每一位,如果这一位设置就会调用软件中断的处理函数。在这个过程中硬件中断是开启的,随时能够打断软件中断。这样保障硬件中断不会失落。
  6. 之后敞开硬件中断(local_irq_disable),查看是否又有软件中断处于pending状态,如果是,并且在本次调用__do_softirq函数过程中没有累计反复进入软件中断解决的次数超过max_restart=10次,就能够从新调用软件中断解决。如果超过了10次,就调用wakeup_softirqd()唤醒内核的一个过程来处理软件中断。设立10次的限度,也是为了防止影响零碎响应工夫。
  7. 调用_local_bh_enable开启软中断。

软中断内核线程

之前咱们剖析的触发软件中断的地位其实是中断上下文中,而在软中断的内核线程中理论曾经是过程的上下文。 这里说的软中断上下文指的就是零碎为每个CPU建设的ksoftirqd过程。 软中断的内核过程中次要有两个大循环,外层的循环解决有软件中断就解决,没有软件中断就休眠。内层的循环处理软件中断,每循环一次都试探一次是否过长时间占据了CPU,须要调度就开释CPU给其它过程。具体的操作在正文中做了解释。

set_current_state(TASK_INTERRUPTIBLE);
    //外层大循环。
    while (!kthread_should_stop()) {
        preempt_disable();//禁止内核抢占,本人把握cpu
        if (!local_softirq_pending()) {
            preempt_enable_no_resched();
            //如果没有软中断在pending中就让出cpu
            schedule();
            //调度之后从新把握cpu
            preempt_disable();
        }

        __set_current_state(TASK_RUNNING);

        while (local_softirq_pending()) {
            /* Preempt disable stops cpu going offline.
               If already offline, we'll be on wrong CPU:
               don't process */
            if (cpu_is_offline((long)__bind_cpu))
                goto wait_to_die;
            //有软中断则开始软中断调度
            do_softirq();
            //查看是否须要调度,防止始终占用cpu
            preempt_enable_no_resched();
            cond_resched();
            preempt_disable();
            rcu_sched_qs((long)__bind_cpu);
        }
        preempt_enable();
        set_current_state(TASK_INTERRUPTIBLE);
    }
    __set_current_state(TASK_RUNNING);
    return 0;

wait_to_die:
    preempt_enable();
    /* Wait for kthread_stop */
    set_current_state(TASK_INTERRUPTIBLE);
    while (!kthread_should_stop()) {
        schedule();
        set_current_state(TASK_INTERRUPTIBLE);
    }
    __set_current_state(TASK_RUNNING);
    return 0;

tasklet

因为软中断必须应用可重入函数,这就导致设计上的复杂度变高,作为设施驱动程序的开发者来说,减少了累赘。而如果某种利用并不需要在多个CPU上并行执行,那么软中断其实是没有必要的。因而诞生了补救以上两个要求的tasklet。它具备以下个性: a)一种特定类型的tasklet只能运行在一个CPU上,不能并行,只能串行执行。 b)多个不同类型的tasklet能够并行在多个CPU上。 c)软中断是动态调配的,在内核编译好之后,就不能扭转。但tasklet就灵便许多,能够在运行时扭转(比方增加模块时)。 tasklet是在两种软中断类型的根底上实现的,因而如果不须要软中断的并行个性,tasklet就是最好的抉择。也就是说tasklet是软中断的一种非凡用法,即提早状况下的串行执行

相干数据结构

  • tasklet描述符
struct tasklet_struct
{
      struct tasklet_struct *next;//将多个tasklet链接成单向循环链表
      unsigned long state;//TASKLET_STATE_SCHED(Tasklet is scheduled for execution)  TASKLET_STATE_RUN(Tasklet is running (SMP only))
      atomic_t count;//0:激活tasklet 非0:禁用tasklet
      void (*func)(unsigned long); //用户自定义函数
      unsigned long data;  //函数入参
};
  • tasklet链表
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);//低优先级
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);//高优先级

相干API

  • 定义tasklet
#define DECLARE_TASKLET(name, func, data) 
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
//定义名字为name的非激活tasklet
#define DECLARE_TASKLET_DISABLED(name, func, data) 
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data } 
//定义名字为name的激活tasklet
void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data)
//动静初始化tasklet
  • tasklet操作
static inline void tasklet_disable(struct tasklet_struct *t)
//函数临时禁止给定的tasklet被tasklet_schedule调度,直到这个tasklet被再次被enable;若这个tasklet以后在运行, 这个函数忙期待直到这个tasklet退出
static inline void tasklet_enable(struct tasklet_struct *t)
//使能一个之前被disable的tasklet;若这个tasklet曾经被调度, 它会很快运行。tasklet_enable和tasklet_disable必须匹配调用, 因为内核跟踪每个tasklet的"禁止次数"
static inline void tasklet_schedule(struct tasklet_struct *t)
//调度 tasklet 执行,如果tasklet在运行中被调度, 它在实现后会再次运行; 这保障了在其余事件被解决当中产生的事件受到应有的留神. 这个做法也容许一个 tasklet 从新调度它本人
tasklet_hi_schedule(struct tasklet_struct *t)
//和tasklet_schedule相似,只是在更高优先级执行。当软中断解决运行时, 它解决高优先级 tasklet 在其余软中断之前,只有具备低响应周期要求的驱动才应应用这个函数, 可防止其他软件中断解决引入的附加周期.
tasklet_kill(struct tasklet_struct *t)
//确保了 tasklet 不会被再次调度来运行,通常当一个设施正被敞开或者模块卸载时被调用。如果 tasklet 正在运行, 这个函数期待直到它执行结束。若 tasklet 从新调度它本人,则必须阻止在调用 tasklet_kill 前它从新调度它本人,如同应用 del_timer_sync 

实现原理

  • 调度原理
static inline void tasklet_schedule(struct tasklet_struct *t)
{
    if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
        __tasklet_schedule(t);
}
void __tasklet_schedule(struct tasklet_struct *t)
{
    unsigned long flags;

    local_irq_save(flags);
    t->next = NULL;
    *__get_cpu_var(tasklet_vec).tail = t;
    __get_cpu_var(tasklet_vec).tail = &(t->next);//退出低优先级列表
    raise_softirq_irqoff(TASKLET_SOFTIRQ);//触发软中断
    local_irq_restore(flags);
}
  • tasklet执行过程 TASKLET_SOFTIRQ对应执行函数为tasklet_action,HI_SOFTIRQ为tasklet_hi_action,以tasklet_action为例阐明,tasklet_hi_action大同小异。
static void tasklet_action(struct softirq_action *a)
{
    struct tasklet_struct *list;

    local_irq_disable();
    list = __get_cpu_var(tasklet_vec).head;
    __get_cpu_var(tasklet_vec).head = NULL;
    __get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;//获得tasklet链表
    local_irq_enable();

    while (list) {
        struct tasklet_struct *t = list;

        list = list->next;

        if (tasklet_trylock(t)) {
            if (!atomic_read(&t->count)) {
                //执行tasklet
                if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
                    BUG();
                t->func(t->data);
                tasklet_unlock(t);
                continue;
            }
            tasklet_unlock(t);
        }
        //如果t->count的值不等于0,阐明这个tasklet在调度之后,被disable掉了,所以会将tasklet构造体从新放回到tasklet_vec链表,并从新调度TASKLET_SOFTIRQ软中断,在之后enable这个tasklet之后从新再执行它
        local_irq_disable();
        t->next = NULL;
        *__get_cpu_var(tasklet_vec).tail = t;
        __get_cpu_var(tasklet_vec).tail = &(t->next);
        __raise_softirq_irqoff(TASKLET_SOFTIRQ);
        local_irq_enable();
    }
}

工作队列

从下面的介绍看以看出,软中断运行在中断上下文中,因而不能阻塞和睡眠,而tasklet应用软中断实现,当然也不能阻塞和睡眠。但如果某提早处理函数须要睡眠或者阻塞呢?没关系工作队列就能够如您所愿了。 把推后执行的工作叫做工作(work),形容它的数据结构为work_struct ,这些工作以队列构造组织成工作队列(workqueue),其数据结构为workqueue_struct ,而工作线程就是负责执行工作队列中的工作。零碎默认的工作者线程为events。 工作队列(work queue)是另外一种将工作推后执行的模式。工作队列能够把工作推后,交由一个内核线程去执行—这个下半局部总是会在过程上下文执行,但因为是内核线程,其不能拜访用户空间。最重要特点的就是工作队列容许从新调度甚至是睡眠。 通常,在工作队列和软中断/tasklet中作出抉择非常容易。可应用以下规定: – 如果推后执行的工作须要睡眠,那么只能抉择工作队列。 – 如果推后执行的工作须要延时指定的工夫再触发,那么应用工作队列,因为其能够利用timer延时(内核定时器实现)。 – 如果推后执行的工作须要在一个tick之内解决,则应用软中断或tasklet,因为其能够抢占一般过程和内核线程,同时不可睡眠。 – 如果推后执行的工作对提早的工夫没有任何要求,则应用工作队列,此时通常为无关紧要的工作。 实际上,工作队列的实质就是将工作交给内核线程解决,因而其能够用内核线程替换。然而内核线程的创立和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以咱们也举荐应用工作队列。

相干数据结构

  • 失常工作构造体
struct work_struct {
    atomic_long_t data; //传递给工作函数的参数
#define WORK_STRUCT_PENDING 0       /* T if work item pending execution */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
    struct list_head entry; //链表构造,链接同一工作队列上的工作。
    work_func_t func; //工作函数,用户自定义实现
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};
//工作队列执行函数的原型:
void (*work_func_t)(struct work_struct *work);
//该函数会由一个工作者线程执行,因而其在过程上下文中,能够睡眠也能够中断。但只能在内核中运行,无法访问用户空间。
  • 提早工作构造体(提早的实现是在调度时提早插入相应的工作队列)
struct delayed_work {
    struct work_struct work;
    struct timer_list timer; //定时器,用于实现提早解决
};
  • 工作队列构造体
struct workqueue_struct {
    struct cpu_workqueue_struct *cpu_wq; //指针数组,其每个元素为per-cpu的工作队列
    struct list_head list;
    const char *name;
    int singlethread; //标记是否只创立一个工作者线程
    int freezeable;     /* Freeze threads during suspend */
    int rt;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};
  • 每cpu工作队列(每cpu都对应一个工作者线程worker_thread)
struct cpu_workqueue_struct {
    spinlock_t lock;
    struct list_head worklist;
    wait_queue_head_t more_work;
    struct work_struct *current_work;
    struct workqueue_struct *wq;
    struct task_struct *thread;
} ____cacheline_aligned;

相干API

  • 缺省工作队列
动态创立 
DECLARE_WORK(name,function); //定义失常执行的工作项
DECLARE_DELAYED_WORK(name,function);//定义延后执行的工作项

动态创建
INIT_WORK(_work, _func) //创立失常执行的工作项
INIT_DELAYED_WORK(_work, _func)//创立延后执行的工作项

调度默认工作队列
int schedule_work(struct work_struct *work)

//对失常执行的工作进行调度,即把给定工作的处理函数提交给缺省的工作队列和工作者线程。工作者线程实质上是一个一般的内核线程,在默认状况下,每个CPU均有一个类型为“events”的工作者线程,当调用schedule_work时,这个工作者线程会被唤醒去执行工作链表上的所有工作。

零碎默认的工作队列名称是:keventd_wq,默认的工作者线程叫:events/n,这里的n是处理器的编号,每个处理器对应一个线程。比方,单处理器的零碎只有events/0这样一个线程。而双处理器的零碎就会多一个events/1线程。
默认的工作队列和工作者线程由内核初始化时创立:
start_kernel()-->rest_init-->do_basic_setup-->init_workqueues

调度提早工作
int schedule_delayed_work(struct delayed_work *dwork,unsigned long delay)

刷新缺省工作队列
void flush_scheduled_work(void)
//此函数会始终期待,直到队列中的所有工作都被执行。

勾销提早工作
static inline int cancel_delayed_work(struct delayed_work *work)
//flush_scheduled_work并不勾销任何提早执行的工作,因而,如果要勾销提早工作,应该调用cancel_delayed_work。

以上均是采纳缺省工作者线程来实现工作队列,其长处是简略易用,毛病是如果缺省工作队列负载太重,执行效率会很低,这就须要咱们创立本人的工作者线程和工作队列。

  • 自定义工作队列
create_workqueue(name) 
//宏定义 返回值为工作队列,name为工作线程名称。创立新的工作队列和相应的工作者线程,name用于该内核线程的命名。

int queue_work(struct workqueue_struct *wq, struct work_struct *work)
//相似于schedule_work,区别在于queue_work把给定工作提交给创立的工作队列wq而不是缺省队列。

int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay)
//调度提早工作。

void flush_workqueue(struct workqueue_struct *wq)
//刷新指定工作队列。

void destroy_workqueue(struct workqueue_struct *wq)
//开释创立的工作队列。

实现原理

  1. 工作队列的组织构造 即workqueue_struct、cpu_workqueue_struct与work_struct的关系。 一个工作队列对应一个work_queue_struct,工作队列中每cpu的工作队列由cpu_workqueue_struct示意,而work_struct为其上的具体工作。 关系如下图所示:

2.工作队列的工作过程

  1. 利用实例 linux各个接口的状态(up/down)的音讯须要告诉netdev_chain上感兴趣的模块同时上报用户空间音讯。这里应用的就是工作队列。 具体流程图如下所示:

  1. 是否处于中断中在Linux中是通过preempt_count来判断的,具体如下: 在linux零碎的过程数据结构里,有这么一个数据结构: #define preempt_count() (current_thread_info()->preempt_count) 利用preempt_count能够示意是否处于中断解决或者软件中断处理过程中,如下所示: # define hardirq_count() (preempt_count() & HARDIRQ_MASK) #define softirq_count() (preempt_count() & SOFTIRQ_MASK) #define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK | NMI_MASK)) #define in_irq() (hardirq_count()) #define in_softirq() (softirq_count()) #define in_interrupt() (irq_count())

preempt_count的8~23位记录中断解决和软件中断处理过程的计数。如果有计数,示意零碎在硬件中断或者软件中断处理过程中。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理