VPP是多线程模型,共享地址空间,最快的通信机制就是直接访问彼此之间的数据。VPP自己实现了一套简单的线程安全机制,用于保护临界区。

VPP多线程之间同步采用的是类似于带信号和超时机制的自旋锁,主要有check、sync、release操作。
总体上类似于pthread_cond_timedwait中的互斥体改成自旋锁所提供的功能,超过BARRIER_SYNC_TIMEOUT时间的话说明可能发生死锁故直接abort。
其中:

  • [ ] vlib_worker_thread_barrier_check类似于pthread_cond_wait操作,等待vlib_worker_threads->wait_at_barrier条件。
  • [ ] vlib_worker_thread_barrier_sync类似于spin_lock操作,置位vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程准备同步。
  • [ ] vlib_worker_thread_barrier_release类似于spin_unlock操作,复位vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程同步结束。

vpp_main线程访问vpp_worker线程的数据的保护机制

数据结构

vlib_worker_thread_t

typedef struct{       ......            volatile u32 *wait_at_barrier;/* 通知work线程开始等待sync标志,main线程开启sync,设置为1,结束设置为0 */    volatile u32 *workers_at_barrier;/* 统计已经进入sync的worker线程的个数,由worker线程加1 */    i64 recursion_level;/* 当前递归深度 */    u64 barrier_sync_count;/* 当前多少个线程已经同步了,当该值等于work线程数时,开始执行临界区操作 */    u8 barrier_elog_enabled;    const char *barrier_caller;/* 开启本次sync的函数名字 */    const char *barrier_context;} vlib_worker_thread_t;

vlib_main_t

typedef struct vlib_main_t{    ......        /* debugging */    volatile int parked_at_barrier;    /*     * Barrier epoch - Set to current time, each time barrier_sync or     * barrier_release is called with zero recursion.     * 用于计算sync持续时间     */    f64 barrier_epoch;    /* Earliest barrier can be closed again */    /* 当前时间小于barrier_no_close_before,不允许启动sync */    f64 barrier_no_close_before;    ......} vlib_main_t;

相关函数分析

  • [ ] vlib_worker_thread_barrier_sync

main线程调用该函数通知worker线程开始sync,等待所有worker线程进入sync状态后,执行临界操作。

#define vlib_worker_thread_barrier_sync(X) {vlib_worker_thread_barrier_sync_int(X, __FUNCTION__);}voidvlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name){    f64 deadline;    f64 now;    f64 t_entry;    f64 t_open;    f64 t_closed;    u32 count;    if (vec_len (vlib_mains) < 2)        return;    /* 只有主线程能够调用该函数 */    ASSERT (vlib_get_thread_index () == 0);    /* vlib_worker_threads[0]为主线程,记录调用该函数的名字 */    vlib_worker_threads[0].barrier_caller = func_name;    count = vec_len (vlib_mains) - 1;/* 工作线程个数 */    /* Record entry relative to last close */    now = vlib_time_now (vm);    t_entry = now - vm->barrier_epoch;    /* Tolerate recursive calls,递归深度,非首次调用直接返回 */    if (++vlib_worker_threads[0].recursion_level > 1)    {        barrier_trace_sync_rec (t_entry);        return;    }    /* 发起sync次数统计 */    vlib_worker_threads[0].barrier_sync_count++;    /* Enforce minimum barrier open time to minimize packet loss */    /* 再次发起sync,必须在禁止其外,每次sync完成后,在指定时间内不能发起第二次sync */    ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));    while (1)    {        now = vlib_time_now (vm);        /* Barrier hold-down timer expired? */        if (now >= vm->barrier_no_close_before)            break;        if ((vm->barrier_no_close_before - now)                > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))        {            clib_warning ("clock change: would have waited for %.4f seconds",                          (vm->barrier_no_close_before - now));            break;        }    }        /* Record time of closure */    /* 两次启动sync的间隔时间,即open时间 */    t_open = now - vm->barrier_epoch;    vm->barrier_epoch = now;    /* 最大时间,debug版本下600秒,其它情况下1秒 */    deadline = now + BARRIER_SYNC_TIMEOUT;    /* 设置wait_at_barrier值为1,通知worker */    *vlib_worker_threads->wait_at_barrier = 1;    /* 等待所有的工作者线程就绪 */    while (*vlib_worker_threads->workers_at_barrier != count)    {        /* 超时直接打印os       panic */        if ((now = vlib_time_now (vm)) > deadline)        {            fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);            os_panic ();        }    }    /* 从开始启动sync过程到所有work线程接受sync的时间 */    t_closed = now - vm->barrier_epoch;    barrier_trace_sync (t_entry, t_open, t_closed);}
  • [ ] vlib_worker_thread_barrier_release

main线程处理完临界区操作后,调用该函数通知worker线程sync过程结束。

/* sync过程结束函数*/voidvlib_worker_thread_barrier_release (vlib_main_t * vm){    f64 deadline;    f64 now;    f64 minimum_open;    f64 t_entry;    f64 t_closed_total;    f64 t_update_main = 0.0;    int refork_needed = 0;    if (vec_len (vlib_mains) < 2)        return;    ASSERT (vlib_get_thread_index () == 0);    now = vlib_time_now (vm);    /* 一对sync与release调用时间段 */    t_entry = now - vm->barrier_epoch;    /* 减少递归深度,如果大于0表示sync还没结束 */    if (--vlib_worker_threads[0].recursion_level > 0)    {        barrier_trace_release_rec (t_entry);        return;    }    ......    deadline = now + BARRIER_SYNC_TIMEOUT;    /*     * Note when we let go of the barrier.     * Workers can use this to derive a reasonably accurate     * time offset. See vlib_time_now(...)     */    vm->time_last_barrier_release = vlib_time_now (vm);    CLIB_MEMORY_STORE_BARRIER ();    /* 清除等待标志 */    *vlib_worker_threads->wait_at_barrier = 0;    /* 等待所有的works线程退出 */    while (*vlib_worker_threads->workers_at_barrier > 0)    {        /* 时间太长,打印panic */        if ((now = vlib_time_now (vm)) > deadline)        {            fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);            os_panic ();        }    }    ......            /* 整个sync持续时间 */    t_closed_total = now - vm->barrier_epoch;    /* 计算下一次sync最少需要休息多久才能启动,与本次sync耗时正相关 */    minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;    if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)    {        minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;    }    /* 设置下次sync的最早时间 */    vm->barrier_no_close_before = now + minimum_open;    /* Record barrier epoch (used to enforce minimum open time) */    /* 更新epoch时间 */    vm->barrier_epoch = now;    barrier_trace_release (t_entry, t_closed_total, t_update_main);}

vlib_worker_thread_barrier_sync和vlib_worker_thread_barrier_release函数只能由main线程成对使用,可以支持嵌套调用。用于实现main线程访问worker线程的数据,效率较差。

  • [ ] vlib_worker_thread_barrier_check*

vpp_main线程启动sync后,worker线程需要调用该函数等待。

static inline voidvlib_worker_thread_barrier_check (void){    /* 如果main线程已经启动了sync过程,则本线程需要进入sync状态 */    if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier))    {        vlib_main_t *vm = vlib_get_main ();        u32 thread_index = vm->thread_index;        f64 t = vlib_time_now (vm);        ......        /* 等待线程数加1 */        clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);        if (CLIB_DEBUG > 0)        {            vm = vlib_get_main ();            vm->parked_at_barrier = 1;        }        /* 自旋等待sync结束 */        while (*vlib_worker_threads->wait_at_barrier);        /*         * Recompute the offset from thread-0 time.         * Note that vlib_time_now adds vm->time_offset, so         * clear it first. Save the resulting idea of "now", to         * see how well we're doing. See show_clock_command_fn(...)         */        {            f64 now;            vm->time_offset = 0.0;            now = vlib_time_now (vm);            vm->time_offset = vlib_global_main.time_last_barrier_release - now;            vm->time_last_barrier_release = vlib_time_now (vm);        }        if (CLIB_DEBUG > 0)            vm->parked_at_barrier = 0;        /* sync已经结束,将等待线程数减掉1 */        clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);        ......    }}

线程互斥机制使用示例

我们以命令“ set interface rx-placement”的主要函数:vnet_hw_interface_assign_rx_thread为例进行展示:

main线程

/* main线程收到命令后,最终会调用该函数 */voidvnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index,                                    u16 queue_id, uword thread_index){    vnet_device_main_t *vdm = &vnet_device_main;    vlib_main_t *vm, *vm0;    vnet_device_input_runtime_t *rt;    vnet_device_and_queue_t *dq;    vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index);    ASSERT (hw->input_node_index > 0);    if (vdm->first_worker_thread_index == 0)        thread_index = 0;    if (thread_index != 0 &&            (thread_index < vdm->first_worker_thread_index ||             thread_index > vdm->last_worker_thread_index))    {        thread_index = vdm->next_worker_thread_index++;        if (vdm->next_worker_thread_index > vdm->last_worker_thread_index)            vdm->next_worker_thread_index = vdm->first_worker_thread_index;    }    vm = vlib_mains[thread_index];    vm0 = vlib_get_main ();/* 本线程,一般是主线程 */    /* 通知worker线程,开始sync */    vlib_worker_thread_barrier_sync (vm0);    rt = vlib_node_get_runtime_data (vm, hw->input_node_index);    vec_add2 (rt->devices_and_queues, dq, 1);    dq->hw_if_index = hw_if_index;    dq->dev_instance = hw->dev_instance;    dq->queue_id = queue_id;    dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;    rt->enabled_node_state = VLIB_NODE_STATE_POLLING;    vnet_device_queue_update (vnm, rt);    vec_validate (hw->input_node_thread_index_by_queue, queue_id);    vec_validate (hw->rx_mode_by_queue, queue_id);    hw->input_node_thread_index_by_queue[queue_id] = thread_index;    hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING;    /* 通知worker线程,sync结束 */    vlib_worker_thread_barrier_release (vm0);    vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);}

work线程

/* 参数is_main决定是主线程还是worker线程 */static_always_inline voidvlib_main_or_worker_loop (vlib_main_t * vm, int is_main){    ......        while (1)    {        vlib_node_runtime_t *n;        /* 存在需要处理的rpc请求,处理 */        if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))        {            if (!is_main)/* 只有work线程才会发送rpc请求 */                vl_api_send_pending_rpc_requests (vm);        }        if (!is_main)/* worker线程 */        {            /* 与main线程进行互斥,如果main线程进入了临界区的话,自旋等待 */            vlib_worker_thread_barrier_check ();            ......        }        ......                vlib_increment_main_loop_counter (vm);        /* Record time stamp in case there are no enabled nodes and above            calls do not update time stamp. */        cpu_time_now = clib_cpu_time_now ();    }}

vpp_worker线程通知vpp_main线程的处理数据机制-RPC

VPP的rpc机制通过API机制实现的,在api机制中注册了两个api:

#define foreach_rpc_api_msg                     \_(RPC_CALL,rpc_call)                            \_(RPC_CALL_REPLY,rpc_call_reply)

数据结构

  • [ ] vlib_main_t
typedef struct vlib_main_t{    ......    /* RPC requests, main thread only */    uword *pending_rpc_requests;      /* 线程准备发送给vpp_main线程处理的rpc */    uword *processing_rpc_requests;   /* vpp_main线程正在处理的rpc数组 */    clib_spinlock_t pending_rpc_lock; /* 保护上面两个数组的自旋锁 */} vlib_main_t;
  • [ ] vl_api_rpc_call_t

rpc的api传递的请求消息

#ifndef _vl_api_defined_rpc_call#define _vl_api_defined_rpc_calltypedef VL_API_PACKED(struct _vl_api_rpc_call {    u16 _vl_msg_id;/* 消息id */    u32 client_index;/* 不需要该索引,因为这个api是内部的 */    u32 context;    u64 function;/* rpc函数 */    u8 multicast;    u8 need_barrier_sync;/* 是否需要进行互斥保护 */    u8 send_reply;/* 是否发送应答,一般不发送应答 */    u32 data_len;    u8 data[0];}) vl_api_rpc_call_t;#endif

相关函数分析

RPC api执行函数

static voidvl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp){    vl_api_rpc_call_reply_t *rmp;    int (*fp) (void *);    i32 rv = 0;    vlib_main_t *vm = vlib_get_main ();    if (mp->function == 0)/* 用户的rpc函数为空,输出waring */    {        rv = -1;        clib_warning ("rpc NULL function pointer");    }    else    {        if (mp->need_barrier_sync)/* 如果需要互斥,则进行保护 */            vlib_worker_thread_barrier_sync (vm);        fp = uword_to_pointer (mp->function, int (*)(void *));/* 转换成函数地址 */        rv = fp (mp->data);/* 执行函数 */        if (mp->need_barrier_sync)            vlib_worker_thread_barrier_release (vm);    }    if (mp->send_reply)/* 如果需要发送应答,则发送应答给客户端,一般不需要发送应答 */    {        svm_queue_t *q = vl_api_client_index_to_input_queue (mp->client_index);        if (q)        {            rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));            rmp->_vl_msg_id = ntohs (VL_API_RPC_CALL_REPLY);            rmp->context = mp->context;            rmp->retval = rv;            vl_msg_api_send_shmem (q, (u8 *) & rmp);        }    }    if (mp->multicast)    {        clib_warning ("multicast not yet implemented...");    }}/* 应答处理函数,没有实现 */static voidvl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp){    clib_warning ("unimplemented");}

发起一次RPC

/* 通知main_thread线程执行我们的函数,通知者可以是worker线程也可以是main线程。** force_rpc:表示强制使用rpc模式,即不直接调用我们指定的函数,让对应的协程去执行**            worker线程调用该函数时,必须设置为1。main线程可以设置也可以不设置*/always_inline voidvl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,                                    u8 force_rpc){    vl_api_rpc_call_t *mp;    vlib_main_t *vm_global = &vlib_global_main;    vlib_main_t *vm = vlib_get_main ();    /* Main thread and not a forced RPC: call the function directly */    /* main线程没有设置force_rpc标志,那就直接执行,不放入协程 */    if ((force_rpc == 0) && (vlib_get_thread_index () == 0))    {        void (*call_fp) (void *);        vlib_worker_thread_barrier_sync (vm);        call_fp = fp;        call_fp (data);        vlib_worker_thread_barrier_release (vm);        return;    }    /* Otherwise, actually do an RPC */    /* 进行一次rpc,分配rpc通信消息结构,使用的是共享内存 */    mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length);    clib_memset (mp, 0, sizeof (*mp));    clib_memcpy_fast (mp->data, data, data_length);    /* 第一个成员必须是消息id,api机制需要这个。内嵌的消息,非插件api,不需要模块基础消息id。     */    mp->_vl_msg_id = ntohs (VL_API_RPC_CALL);    mp->function = pointer_to_uword (fp);    mp->need_barrier_sync = 1;    /* Add to the pending vector. Thread 0 requires locking. */    /* main线程的pending_rpc_requests向量是临界区,需要进行保护    ** 其它线程pending_rpc_requests自己读占,不需要保护    */    if (vm == vm_global)        clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);    vec_add1 (vm->pending_rpc_requests, (uword) mp);    if (vm == vm_global)        clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);}/* * Check if called from worker threads. * If so, make rpc call of fp through shmem. * Otherwise, call fp directly */voidvl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length){    vl_api_rpc_call_main_thread_inline (fp, data, data_length,    /*force_rpc */                                        0);}/* * Always make rpc call of fp through shmem, useful for calling from threads * not setup as worker threads, such as DPDK callback thread * 强制main线程通过共享内存进行rpc调用,不直接调用 */voidvl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length){    vl_api_rpc_call_main_thread_inline (fp, data, data_length,    /*force_rpc */                                        1);}main线程中的协程还可以使用函数vlib_rpc_call_main_thread发起RPCvoid *rpc_call_main_thread_cb_fn;voidvlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size){    /* 全局函数指针,在初始化的时候其值被设置为vl_api_rpc_call_main_thread函数的地址 */    if (rpc_call_main_thread_cb_fn)    {        void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;        (*fp) (callback, args, arg_size);    }    else        clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");}

worker线程将本线程发起的RPC转交给main线程

/* worker线程将收集的rpc请求从自己的pending_rpc_requests中转移到main线程的pending_rpc_requests */voidvl_api_send_pending_rpc_requests (vlib_main_t * vm){    vlib_main_t *vm_global = &vlib_global_main;    ASSERT (vm != vm_global);    clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);    vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);    vec_reset_length (vm->pending_rpc_requests);    clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);}
  • [ ] vlib_main_or_worker_loop

只有worker线程才需要将RPC请求转移到main线程。

/* 参数is_main决定是主线程还是worker线程 */static_always_inline voidvlib_main_or_worker_loop (vlib_main_t * vm, int is_main){    while (1)    {        vlib_node_runtime_t *n;        /* woerk线程将本线程收集的rpc请求转交给main线程 */        if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))        {            if (!is_main)/* 只有work线程才会将自己发起的rpc请求转移到main线程 */                vl_api_send_pending_rpc_requests (vm);        }        ......                vlib_increment_main_loop_counter (vm);        /* Record time stamp in case there are no enabled nodes and above            calls do not update time stamp. */        cpu_time_now = clib_cpu_time_now ();    }}

协程处理RPC

RPC处理是在协程"api-rx-from-ring",这个协程也是处理api的协程。

/* *INDENT-OFF* */VLIB_REGISTER_NODE (vl_api_clnt_node) ={    .function = vl_api_clnt_process,    .type = VLIB_NODE_TYPE_PROCESS,    .name = "api-rx-from-ring",    .state = VLIB_NODE_STATE_DISABLED,};

协程主函数vl_api_clnt_process

static uwordvl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,                     vlib_frame_t * f){    ......    /* $$$ pay attention to frame size, control CPU usage */    while (1)    {        /*         * There's a reason for checking the queue before         * sleeping. If the vlib application crashes, it's entirely         * possible for a client to enqueue a connect request         * during the process restart interval.         *         * Unless some force of physics causes the new incarnation         * of the application to process the request, the client will         * sit and wait for Godot...         */        vector_rate = vlib_last_vector_length_per_node (vm);        start_time = vlib_time_now (vm);        while (1)        {            if (vl_mem_api_handle_rpc (vm, node)/* 执行协程请求 */                    || vl_mem_api_handle_msg_main (vm, node))/* 执行api请求 */            {                vm->api_queue_nonempty = 0;                VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);                sleep_time = 20.0;                break;            }            ......        }        ......    }    return 0;}intvl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node){    api_main_t *am = &api_main;    int i;    uword *tmp, mp;    /*     * Swap pending and processing vectors, then process the RPCs     * Avoid deadlock conditions by construction.     * 将等待处理的人rpc请求转移到局部变量tmp。避免临界时间太长。     */    clib_spinlock_lock_if_init (&vm->pending_rpc_lock);    tmp = vm->processing_rpc_requests;    vec_reset_length (tmp);    vm->processing_rpc_requests = vm->pending_rpc_requests;    vm->pending_rpc_requests = tmp;    clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);    /*     * RPCs are used to reflect function calls to thread 0     * when the underlying code is not thread-safe.     *     * Grabbing the thread barrier across a set of RPCs     * greatly increases efficiency, and avoids     * running afoul of the barrier sync holddown timer.     * The barrier sync code supports recursive locking.     *     * We really need to rewrite RPC-based code...     */    if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests)))    {        vl_msg_api_barrier_sync ();        for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)/* 循环处理每一个rpc */        {            mp = vm->processing_rpc_requests[i];            vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);        }        vl_msg_api_barrier_release ();    }    return 0;}/* This is only to be called from a vlib/vnet app */voidvl_msg_api_handler_with_vm_node (api_main_t * am,                                 void *the_msg, vlib_main_t * vm,                                 vlib_node_runtime_t * node){    u16 id = ntohs (*((u16 *) the_msg));/* 获取消息id,传递的消息第一个成员就是消息id */    u8 *(*handler) (void *, void *, void *);    u8 *(*print_fp) (void *, void *);    ......    /* 根据消息id获取对应的执行函数,即VL_API_RPC_CALL对应的函数vl_api_rpc_call_t_handler */    if (id < vec_len (am->msg_handlers) && am->msg_handlers[id])    {        handler = (void *) am->msg_handlers[id];        if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled))            vl_msg_api_trace (am, am->rx_trace, the_msg);        if (PREDICT_FALSE (am->msg_print_flag))        {            fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]);            print_fp = (void *) am->msg_print_handlers[id];            if (print_fp == 0)            {                fformat (stdout, "  [no registered print fn for msg %d]\n", id);            }            else            {                (*print_fp) (the_msg, vm);            }        }        if (!am->is_mp_safe[id])        {            vl_msg_api_barrier_trace_context (am->msg_names[id]);            vl_msg_api_barrier_sync ();        }        /* 执行函数vl_api_rpc_call_t_handler */        (*handler) (the_msg, vm, node);        if (!am->is_mp_safe[id])            vl_msg_api_barrier_release ();    }    else    {        clib_warning ("no handler for msg id %d", id);    }    /*     * Special-case, so we can e.g. bounce messages off the vnet     * main thread without copying them...     */    if (!(am->message_bounce[id]))        vl_msg_api_free (the_msg);    ......}