

VPP 是多线程模型,共享地址空间,最快的通信机制就是直接访问彼此之间的数据。VPP 自己实现了一套简单的线程安全机制,用于保护临界区。

VPP 多线程之间同步采用的是类似于带信号和超时机制的自旋锁,主要有 check、sync、release 操作。
总体上类似于 pthread_cond_timedwait 中的互斥体改成自旋锁所提供的功能,超过 BARRIER_SYNC_TIMEOUT 时间的话说明可能发生死锁故直接 abort。

  • [] vlib_worker_thread_barrier_check 类似于 pthread_cond_wait 操作,等待 vlib_worker_threads->wait_at_barrier 条件。
  • [] vlib_worker_thread_barrier_sync 类似于 spin_lock 操作,置位 vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程准备同步。
  • [] vlib_worker_thread_barrier_release 类似于 spin_unlock 操作,复位 vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程同步结束。

vpp_main 线程访问 vpp_worker 线程的数据的保护机制



typedef struct
    volatile u32 *wait_at_barrier;/* 通知 work 线程开始等待 sync 标志,main 线程开启 sync,设置为 1,结束设置为 0 */
    volatile u32 *workers_at_barrier;/* 统计已经进入 sync 的 worker 线程的个数,由 worker 线程加 1 */
    i64 recursion_level;/* 当前递归深度 */
    u64 barrier_sync_count;/* 当前多少个线程已经同步了,当该值等于 work 线程数时,开始执行临界区操作 */
    u8 barrier_elog_enabled;
    const char *barrier_caller;/* 开启本次 sync 的函数名字 */
    const char *barrier_context;
} vlib_worker_thread_t;


typedef struct vlib_main_t
    /* debugging */
    volatile int parked_at_barrier;

     * Barrier epoch - Set to current time, each time barrier_sync or
     * barrier_release is called with zero recursion.
     * 用于计算 sync 持续时间
    f64 barrier_epoch;

    /* Earliest barrier can be closed again */
    /* 当前时间小于 barrier_no_close_before,不允许启动 sync */
    f64 barrier_no_close_before;
} vlib_main_t;


  • [] vlib_worker_thread_barrier_sync

main 线程调用该函数通知 worker 线程开始 sync,等待所有 worker 线程进入 sync 状态后,执行临界操作。

#define vlib_worker_thread_barrier_sync(X) {vlib_worker_thread_barrier_sync_int(X, __FUNCTION__);}

vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
    f64 deadline;
    f64 now;
    f64 t_entry;
    f64 t_open;
    f64 t_closed;
    u32 count;

    if (vec_len (vlib_mains) < 2)
    /* 只有主线程能够调用该函数 */
    ASSERT (vlib_get_thread_index () == 0);
    /* vlib_worker_threads[0] 为主线程,记录调用该函数的名字 */
    vlib_worker_threads[0].barrier_caller = func_name;
    count = vec_len (vlib_mains) - 1;/* 工作线程个数 */

    /* Record entry relative to last close */
    now = vlib_time_now (vm);
    t_entry = now - vm->barrier_epoch;

    /* Tolerate recursive calls,递归深度,非首次调用直接返回 */
    if (++vlib_worker_threads[0].recursion_level > 1)
    {barrier_trace_sync_rec (t_entry);
    /* 发起 sync 次数统计 */

    /* Enforce minimum barrier open time to minimize packet loss */
    /* 再次发起 sync,必须在禁止其外,每次 sync 完成后,在指定时间内不能发起第二次 sync */
    ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));

    while (1)
    {now = vlib_time_now (vm);
        /* Barrier hold-down timer expired? */
        if (now >= vm->barrier_no_close_before)
        if ((vm->barrier_no_close_before - now)
                > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
            clib_warning ("clock change: would have waited for %.4f seconds",
                          (vm->barrier_no_close_before - now));
    /* Record time of closure */
    /* 两次启动 sync 的间隔时间,即 open 时间 */
    t_open = now - vm->barrier_epoch;
    vm->barrier_epoch = now;
    /* 最大时间,debug 版本下 600 秒,其它情况下 1 秒 */
    deadline = now + BARRIER_SYNC_TIMEOUT;
    /* 设置 wait_at_barrier 值为 1,通知 worker */
    *vlib_worker_threads->wait_at_barrier = 1;
    /* 等待所有的工作者线程就绪 */
    while (*vlib_worker_threads->workers_at_barrier != count)
        /* 超时直接打印 os       panic */
        if ((now = vlib_time_now (vm)) > deadline)
        {fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
            os_panic ();}
    /* 从开始启动 sync 过程到所有 work 线程接受 sync 的时间 */
    t_closed = now - vm->barrier_epoch;

    barrier_trace_sync (t_entry, t_open, t_closed);

  • [] vlib_worker_thread_barrier_release

main 线程处理完临界区操作后,调用该函数通知 worker 线程 sync 过程结束。

/* sync 过程结束函数 */
vlib_worker_thread_barrier_release (vlib_main_t * vm)
    f64 deadline;
    f64 now;
    f64 minimum_open;
    f64 t_entry;
    f64 t_closed_total;
    f64 t_update_main = 0.0;
    int refork_needed = 0;

    if (vec_len (vlib_mains) < 2)

    ASSERT (vlib_get_thread_index () == 0);

    now = vlib_time_now (vm);
    /* 一对 sync 与 release 调用时间段 */
    t_entry = now - vm->barrier_epoch;
    /* 减少递归深度,如果大于 0 表示 sync 还没结束 */
    if (--vlib_worker_threads[0].recursion_level > 0)
    {barrier_trace_release_rec (t_entry);


    deadline = now + BARRIER_SYNC_TIMEOUT;

     * Note when we let go of the barrier.
     * Workers can use this to derive a reasonably accurate
     * time offset. See vlib_time_now(...)
    vm->time_last_barrier_release = vlib_time_now (vm);
    /* 清除等待标志 */
    *vlib_worker_threads->wait_at_barrier = 0;
    /* 等待所有的 works 线程退出 */
    while (*vlib_worker_threads->workers_at_barrier > 0)
        /* 时间太长,打印 panic */
        if ((now = vlib_time_now (vm)) > deadline)
        {fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
            os_panic ();}

    /* 整个 sync 持续时间 */
    t_closed_total = now - vm->barrier_epoch;
    /* 计算下一次 sync 最少需要休息多久才能启动,与本次 sync 耗时正相关 */
    minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;

    if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
    {minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;}
    /* 设置下次 sync 的最早时间 */
    vm->barrier_no_close_before = now + minimum_open;

    /* Record barrier epoch (used to enforce minimum open time) */
    /* 更新 epoch 时间 */
    vm->barrier_epoch = now;

    barrier_trace_release (t_entry, t_closed_total, t_update_main);


vlib_worker_thread_barrier_sync 和 vlib_worker_thread_barrier_release 函数只能由 main 线程成对使用,可以支持嵌套调用。用于实现 main 线程访问 worker 线程的数据,效率较差。

  • [] vlib_worker_thread_barrier_check*

vpp_main 线程启动 sync 后,worker 线程需要调用该函数等待。

static inline void
vlib_worker_thread_barrier_check (void)
    /* 如果 main 线程已经启动了 sync 过程,则本线程需要进入 sync 状态 */
    if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier))
    {vlib_main_t *vm = vlib_get_main ();
        u32 thread_index = vm->thread_index;
        f64 t = vlib_time_now (vm);
        /* 等待线程数加 1 */
        clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
        if (CLIB_DEBUG > 0)
        {vm = vlib_get_main ();
            vm->parked_at_barrier = 1;
        /* 自旋等待 sync 结束 */
        while (*vlib_worker_threads->wait_at_barrier);

         * Recompute the offset from thread-0 time.
         * Note that vlib_time_now adds vm->time_offset, so
         * clear it first. Save the resulting idea of "now", to
         * see how well we're doing. See show_clock_command_fn(...)
            f64 now;
            vm->time_offset = 0.0;
            now = vlib_time_now (vm);
            vm->time_offset = vlib_global_main.time_last_barrier_release - now;
            vm->time_last_barrier_release = vlib_time_now (vm);

        if (CLIB_DEBUG > 0)
            vm->parked_at_barrier = 0;
        /* sync 已经结束,将等待线程数减掉 1 */
        clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);


我们以命令“set interface rx-placement”的主要函数:vnet_hw_interface_assign_rx_thread 为例进行展示:

main 线程

/* main 线程收到命令后,最终会调用该函数 */
vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index,
                                    u16 queue_id, uword thread_index)
    vnet_device_main_t *vdm = &vnet_device_main;
    vlib_main_t *vm, *vm0;
    vnet_device_input_runtime_t *rt;
    vnet_device_and_queue_t *dq;
    vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index);

    ASSERT (hw->input_node_index > 0);

    if (vdm->first_worker_thread_index == 0)
        thread_index = 0;

    if (thread_index != 0 &&
            (thread_index < vdm->first_worker_thread_index ||
             thread_index > vdm->last_worker_thread_index))
        thread_index = vdm->next_worker_thread_index++;
        if (vdm->next_worker_thread_index > vdm->last_worker_thread_index)
            vdm->next_worker_thread_index = vdm->first_worker_thread_index;

    vm = vlib_mains[thread_index];
    vm0 = vlib_get_main ();/* 本线程,一般是主线程 */
    /* 通知 worker 线程,开始 sync */
    vlib_worker_thread_barrier_sync (vm0);

    rt = vlib_node_get_runtime_data (vm, hw->input_node_index);

    vec_add2 (rt->devices_and_queues, dq, 1);
    dq->hw_if_index = hw_if_index;
    dq->dev_instance = hw->dev_instance;
    dq->queue_id = queue_id;
    rt->enabled_node_state = VLIB_NODE_STATE_POLLING;

    vnet_device_queue_update (vnm, rt);
    vec_validate (hw->input_node_thread_index_by_queue, queue_id);
    vec_validate (hw->rx_mode_by_queue, queue_id);
    hw->input_node_thread_index_by_queue[queue_id] = thread_index;
    hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING;
    /* 通知 worker 线程,sync 结束 */
    vlib_worker_thread_barrier_release (vm0);

    vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);

work 线程

/* 参数 is_main 决定是主线程还是 worker 线程 */
static_always_inline void
vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
    while (1)
        vlib_node_runtime_t *n;

        /* 存在需要处理的 rpc 请求,处理 */
        if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
        {if (!is_main)/* 只有 work 线程才会发送 rpc 请求 */
                vl_api_send_pending_rpc_requests (vm);

        if (!is_main)/* worker 线程 */
            /* 与 main 线程进行互斥,如果 main 线程进入了临界区的话,自旋等待 */
            vlib_worker_thread_barrier_check ();
        vlib_increment_main_loop_counter (vm);

        /* Record time stamp in case there are no enabled nodes and above
            calls do not update time stamp. */
        cpu_time_now = clib_cpu_time_now ();}

vpp_worker 线程通知 vpp_main 线程的处理数据机制 -RPC

VPP 的 rpc 机制通过 API 机制实现的,在 api 机制中注册了两个 api:

#define foreach_rpc_api_msg                     \
_(RPC_CALL,rpc_call)                            \


  • [] vlib_main_t
typedef struct vlib_main_t
    /* RPC requests, main thread only */
    uword *pending_rpc_requests;      /* 线程准备发送给 vpp_main 线程处理的 rpc */
    uword *processing_rpc_requests;   /* vpp_main 线程正在处理的 rpc 数组 */
    clib_spinlock_t pending_rpc_lock; /* 保护上面两个数组的自旋锁 */
} vlib_main_t;
  • [] vl_api_rpc_call_t

rpc 的 api 传递的请求消息

#ifndef _vl_api_defined_rpc_call
#define _vl_api_defined_rpc_call
typedef VL_API_PACKED(struct _vl_api_rpc_call {
    u16 _vl_msg_id;/* 消息 id */
    u32 client_index;/* 不需要该索引,因为这个 api 是内部的 */
    u32 context;
    u64 function;/* rpc 函数 */
    u8 multicast;
    u8 need_barrier_sync;/* 是否需要进行互斥保护 */
    u8 send_reply;/* 是否发送应答,一般不发送应答 */
    u32 data_len;
    u8 data[0];
}) vl_api_rpc_call_t;


RPC api 执行函数

static void
vl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp)
    vl_api_rpc_call_reply_t *rmp;
    int (*fp) (void *);
    i32 rv = 0;
    vlib_main_t *vm = vlib_get_main ();

    if (mp->function == 0)/* 用户的 rpc 函数为空,输出 waring */
        rv = -1;
        clib_warning ("rpc NULL function pointer");

    {if (mp->need_barrier_sync)/* 如果需要互斥,则进行保护 */
            vlib_worker_thread_barrier_sync (vm);

        fp = uword_to_pointer (mp->function, int (*)(void *));/* 转换成函数地址 */
        rv = fp (mp->data);/* 执行函数 */

        if (mp->need_barrier_sync)
            vlib_worker_thread_barrier_release (vm);

    if (mp->send_reply)/* 如果需要发送应答,则发送应答给客户端,一般不需要发送应答 */
    {svm_queue_t *q = vl_api_client_index_to_input_queue (mp->client_index);
        if (q)
        {rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
            rmp->_vl_msg_id = ntohs (VL_API_RPC_CALL_REPLY);
            rmp->context = mp->context;
            rmp->retval = rv;
            vl_msg_api_send_shmem (q, (u8 *) & rmp);
    if (mp->multicast)
    {clib_warning ("multicast not yet implemented...");
/* 应答处理函数,没有实现 */
static void
vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp)
{clib_warning ("unimplemented");

发起一次 RPC

/* 通知 main_thread 线程执行我们的函数,通知者可以是 worker 线程也可以是 main 线程。** force_rpc:表示强制使用 rpc 模式,即不直接调用我们指定的函数,让对应的协程去执行
**            worker 线程调用该函数时,必须设置为 1。main 线程可以设置也可以不设置
always_inline void
vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
                                    u8 force_rpc)
    vl_api_rpc_call_t *mp;
    vlib_main_t *vm_global = &vlib_global_main;
    vlib_main_t *vm = vlib_get_main ();

    /* Main thread and not a forced RPC: call the function directly */
    /* main 线程没有设置 force_rpc 标志,那就直接执行,不放入协程 */
    if ((force_rpc == 0) && (vlib_get_thread_index () == 0))
    {void (*call_fp) (void *);

        vlib_worker_thread_barrier_sync (vm);

        call_fp = fp;
        call_fp (data);

        vlib_worker_thread_barrier_release (vm);

    /* Otherwise, actually do an RPC */
    /* 进行一次 rpc,分配 rpc 通信消息结构,使用的是共享内存 */
    mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length);

    clib_memset (mp, 0, sizeof (*mp));
    clib_memcpy_fast (mp->data, data, data_length);
    /* 第一个成员必须是消息 id,api 机制需要这个。内嵌的消息,非插件 api,不需要模块基础消息 id。*/
    mp->_vl_msg_id = ntohs (VL_API_RPC_CALL);
    mp->function = pointer_to_uword (fp);
    mp->need_barrier_sync = 1;

    /* Add to the pending vector. Thread 0 requires locking. */
    /* main 线程的 pending_rpc_requests 向量是临界区,需要进行保护
    ** 其它线程 pending_rpc_requests 自己读占,不需要保护
    if (vm == vm_global)
        clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
    vec_add1 (vm->pending_rpc_requests, (uword) mp);
    if (vm == vm_global)
        clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 * Check if called from worker threads.
 * If so, make rpc call of fp through shmem.
 * Otherwise, call fp directly
vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
    vl_api_rpc_call_main_thread_inline (fp, data, data_length,    /*force_rpc */

 * Always make rpc call of fp through shmem, useful for calling from threads
 * not setup as worker threads, such as DPDK callback thread
 * 强制 main 线程通过共享内存进行 rpc 调用,不直接调用
vl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
    vl_api_rpc_call_main_thread_inline (fp, data, data_length,    /*force_rpc */
main 线程中的协程还可以使用函数 vlib_rpc_call_main_thread 发起 RPC
void *rpc_call_main_thread_cb_fn;

vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
    /* 全局函数指针,在初始化的时候其值被设置为 vl_api_rpc_call_main_thread 函数的地址 */
    if (rpc_call_main_thread_cb_fn)
    {void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
        (*fp) (callback, args, arg_size);
        clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");

worker 线程将本线程发起的 RPC 转交给 main 线程

/* worker 线程将收集的 rpc 请求从自己的 pending_rpc_requests 中转移到 main 线程的 pending_rpc_requests */
vl_api_send_pending_rpc_requests (vlib_main_t * vm)
    vlib_main_t *vm_global = &vlib_global_main;

    ASSERT (vm != vm_global);

    clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
    vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
    vec_reset_length (vm->pending_rpc_requests);
    clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
  • [] vlib_main_or_worker_loop

只有 worker 线程才需要将 RPC 请求转移到 main 线程。

/* 参数 is_main 决定是主线程还是 worker 线程 */
static_always_inline void
vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
{while (1)
        vlib_node_runtime_t *n;

        /* woerk 线程将本线程收集的 rpc 请求转交给 main 线程 */
        if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
        {if (!is_main)/* 只有 work 线程才会将自己发起的 rpc 请求转移到 main 线程 */
                vl_api_send_pending_rpc_requests (vm);
        vlib_increment_main_loop_counter (vm);

        /* Record time stamp in case there are no enabled nodes and above
            calls do not update time stamp. */
        cpu_time_now = clib_cpu_time_now ();}

协程处理 RPC

RPC 处理是在协程 ”api-rx-from-ring”,这个协程也是处理 api 的协程。

/* *INDENT-OFF* */
VLIB_REGISTER_NODE (vl_api_clnt_node) =
    .function = vl_api_clnt_process,
    .name = "api-rx-from-ring",

协程主函数 vl_api_clnt_process

static uword
vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
                     vlib_frame_t * f)
    /* $$$ pay attention to frame size, control CPU usage */
    while (1)
         * There's a reason for checking the queue before
         * sleeping. If the vlib application crashes, it's entirely
         * possible for a client to enqueue a connect request
         * during the process restart interval.
         * Unless some force of physics causes the new incarnation
         * of the application to process the request, the client will
         * sit and wait for Godot...
        vector_rate = vlib_last_vector_length_per_node (vm);
        start_time = vlib_time_now (vm);
        while (1)
        {if (vl_mem_api_handle_rpc (vm, node)/* 执行协程请求 */
                    || vl_mem_api_handle_msg_main (vm, node))/* 执行 api 请求 */
                vm->api_queue_nonempty = 0;
                VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
                sleep_time = 20.0;

    return 0;

vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
    api_main_t *am = &api_main;
    int i;
    uword *tmp, mp;

     * Swap pending and processing vectors, then process the RPCs
     * Avoid deadlock conditions by construction.
     * 将等待处理的人 rpc 请求转移到局部变量 tmp。避免临界时间太长。*/
    clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
    tmp = vm->processing_rpc_requests;
    vec_reset_length (tmp);
    vm->processing_rpc_requests = vm->pending_rpc_requests;
    vm->pending_rpc_requests = tmp;
    clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);

     * RPCs are used to reflect function calls to thread 0
     * when the underlying code is not thread-safe.
     * Grabbing the thread barrier across a set of RPCs
     * greatly increases efficiency, and avoids
     * running afoul of the barrier sync holddown timer.
     * The barrier sync code supports recursive locking.
     * We really need to rewrite RPC-based code...
    if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests)))
    {vl_msg_api_barrier_sync ();
        for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)/* 循环处理每一个 rpc */
        {mp = vm->processing_rpc_requests[i];
            vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
        vl_msg_api_barrier_release ();}

    return 0;

/* This is only to be called from a vlib/vnet app */
vl_msg_api_handler_with_vm_node (api_main_t * am,
                                 void *the_msg, vlib_main_t * vm,
                                 vlib_node_runtime_t * node)
{u16 id = ntohs (*((u16 *) the_msg));/* 获取消息 id,传递的消息第一个成员就是消息 id */
    u8 *(*handler) (void *, void *, void *);
    u8 *(*print_fp) (void *, void *);

    /* 根据消息 id 获取对应的执行函数,即 VL_API_RPC_CALL 对应的函数 vl_api_rpc_call_t_handler */
    if (id < vec_len (am->msg_handlers) && am->msg_handlers[id])
    {handler = (void *) am->msg_handlers[id];

        if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled))
            vl_msg_api_trace (am, am->rx_trace, the_msg);

        if (PREDICT_FALSE (am->msg_print_flag))
        {fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]);
            print_fp = (void *) am->msg_print_handlers[id];
            if (print_fp == 0)
            {fformat (stdout, "[no registered print fn for msg %d]\n", id);
            {(*print_fp) (the_msg, vm);

        if (!am->is_mp_safe[id])
        {vl_msg_api_barrier_trace_context (am->msg_names[id]);
            vl_msg_api_barrier_sync ();}
        /* 执行函数 vl_api_rpc_call_t_handler */
        (*handler) (the_msg, vm, node);
        if (!am->is_mp_safe[id])
            vl_msg_api_barrier_release ();}
    {clib_warning ("no handler for msg id %d", id);

     * Special-case, so we can e.g. bounce messages off the vnet
     * main thread without copying them...
    if (!(am->message_bounce[id]))
        vl_msg_api_free (the_msg);
