VPP是多线程模型,共享地址空间,最快的通信机制就是直接访问彼此之间的数据。VPP自己实现了一套简单的线程安全机制,用于保护临界区。
VPP多线程之间同步采用的是类似于带信号和超时机制的自旋锁,主要有check、sync、release操作。
总体上类似于pthread_cond_timedwait中的互斥体改成自旋锁所提供的功能,超过BARRIER_SYNC_TIMEOUT时间的话说明可能发生死锁故直接abort。
其中:
- [ ] vlib_worker_thread_barrier_check类似于pthread_cond_wait操作,等待vlib_worker_threads->wait_at_barrier条件。
- [ ] vlib_worker_thread_barrier_sync类似于spin_lock操作,置位vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程准备同步。
- [ ] vlib_worker_thread_barrier_release类似于spin_unlock操作,复位vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程同步结束。
vpp_main线程访问vpp_worker线程的数据的保护机制
数据结构
vlib_worker_thread_t
typedef struct{ ...... volatile u32 *wait_at_barrier;/* 通知work线程开始等待sync标志,main线程开启sync,设置为1,结束设置为0 */ volatile u32 *workers_at_barrier;/* 统计已经进入sync的worker线程的个数,由worker线程加1 */ i64 recursion_level;/* 当前递归深度 */ u64 barrier_sync_count;/* 当前多少个线程已经同步了,当该值等于work线程数时,开始执行临界区操作 */ u8 barrier_elog_enabled; const char *barrier_caller;/* 开启本次sync的函数名字 */ const char *barrier_context;} vlib_worker_thread_t;
vlib_main_t
typedef struct vlib_main_t{ ...... /* debugging */ volatile int parked_at_barrier; /* * Barrier epoch - Set to current time, each time barrier_sync or * barrier_release is called with zero recursion. * 用于计算sync持续时间 */ f64 barrier_epoch; /* Earliest barrier can be closed again */ /* 当前时间小于barrier_no_close_before,不允许启动sync */ f64 barrier_no_close_before; ......} vlib_main_t;
相关函数分析
- [ ] vlib_worker_thread_barrier_sync
main线程调用该函数通知worker线程开始sync,等待所有worker线程进入sync状态后,执行临界操作。
#define vlib_worker_thread_barrier_sync(X) {vlib_worker_thread_barrier_sync_int(X, __FUNCTION__);}voidvlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name){ f64 deadline; f64 now; f64 t_entry; f64 t_open; f64 t_closed; u32 count; if (vec_len (vlib_mains) < 2) return; /* 只有主线程能够调用该函数 */ ASSERT (vlib_get_thread_index () == 0); /* vlib_worker_threads[0]为主线程,记录调用该函数的名字 */ vlib_worker_threads[0].barrier_caller = func_name; count = vec_len (vlib_mains) - 1;/* 工作线程个数 */ /* Record entry relative to last close */ now = vlib_time_now (vm); t_entry = now - vm->barrier_epoch; /* Tolerate recursive calls,递归深度,非首次调用直接返回 */ if (++vlib_worker_threads[0].recursion_level > 1) { barrier_trace_sync_rec (t_entry); return; } /* 发起sync次数统计 */ vlib_worker_threads[0].barrier_sync_count++; /* Enforce minimum barrier open time to minimize packet loss */ /* 再次发起sync,必须在禁止其外,每次sync完成后,在指定时间内不能发起第二次sync */ ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT)); while (1) { now = vlib_time_now (vm); /* Barrier hold-down timer expired? */ if (now >= vm->barrier_no_close_before) break; if ((vm->barrier_no_close_before - now) > (2.0 * BARRIER_MINIMUM_OPEN_LIMIT)) { clib_warning ("clock change: would have waited for %.4f seconds", (vm->barrier_no_close_before - now)); break; } } /* Record time of closure */ /* 两次启动sync的间隔时间,即open时间 */ t_open = now - vm->barrier_epoch; vm->barrier_epoch = now; /* 最大时间,debug版本下600秒,其它情况下1秒 */ deadline = now + BARRIER_SYNC_TIMEOUT; /* 设置wait_at_barrier值为1,通知worker */ *vlib_worker_threads->wait_at_barrier = 1; /* 等待所有的工作者线程就绪 */ while (*vlib_worker_threads->workers_at_barrier != count) { /* 超时直接打印os panic */ if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } /* 从开始启动sync过程到所有work线程接受sync的时间 */ t_closed = now - vm->barrier_epoch; barrier_trace_sync (t_entry, t_open, t_closed);}
- [ ] vlib_worker_thread_barrier_release
main线程处理完临界区操作后,调用该函数通知worker线程sync过程结束。
/* sync过程结束函数*/voidvlib_worker_thread_barrier_release (vlib_main_t * vm){ f64 deadline; f64 now; f64 minimum_open; f64 t_entry; f64 t_closed_total; f64 t_update_main = 0.0; int refork_needed = 0; if (vec_len (vlib_mains) < 2) return; ASSERT (vlib_get_thread_index () == 0); now = vlib_time_now (vm); /* 一对sync与release调用时间段 */ t_entry = now - vm->barrier_epoch; /* 减少递归深度,如果大于0表示sync还没结束 */ if (--vlib_worker_threads[0].recursion_level > 0) { barrier_trace_release_rec (t_entry); return; } ...... deadline = now + BARRIER_SYNC_TIMEOUT; /* * Note when we let go of the barrier. * Workers can use this to derive a reasonably accurate * time offset. See vlib_time_now(...) */ vm->time_last_barrier_release = vlib_time_now (vm); CLIB_MEMORY_STORE_BARRIER (); /* 清除等待标志 */ *vlib_worker_threads->wait_at_barrier = 0; /* 等待所有的works线程退出 */ while (*vlib_worker_threads->workers_at_barrier > 0) { /* 时间太长,打印panic */ if ((now = vlib_time_now (vm)) > deadline) { fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__); os_panic (); } } ...... /* 整个sync持续时间 */ t_closed_total = now - vm->barrier_epoch; /* 计算下一次sync最少需要休息多久才能启动,与本次sync耗时正相关 */ minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR; if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT) { minimum_open = BARRIER_MINIMUM_OPEN_LIMIT; } /* 设置下次sync的最早时间 */ vm->barrier_no_close_before = now + minimum_open; /* Record barrier epoch (used to enforce minimum open time) */ /* 更新epoch时间 */ vm->barrier_epoch = now; barrier_trace_release (t_entry, t_closed_total, t_update_main);}
vlib_worker_thread_barrier_sync和vlib_worker_thread_barrier_release函数只能由main线程成对使用,可以支持嵌套调用。用于实现main线程访问worker线程的数据,效率较差。
- [ ] vlib_worker_thread_barrier_check*
vpp_main线程启动sync后,worker线程需要调用该函数等待。
static inline voidvlib_worker_thread_barrier_check (void){ /* 如果main线程已经启动了sync过程,则本线程需要进入sync状态 */ if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier)) { vlib_main_t *vm = vlib_get_main (); u32 thread_index = vm->thread_index; f64 t = vlib_time_now (vm); ...... /* 等待线程数加1 */ clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1); if (CLIB_DEBUG > 0) { vm = vlib_get_main (); vm->parked_at_barrier = 1; } /* 自旋等待sync结束 */ while (*vlib_worker_threads->wait_at_barrier); /* * Recompute the offset from thread-0 time. * Note that vlib_time_now adds vm->time_offset, so * clear it first. Save the resulting idea of "now", to * see how well we're doing. See show_clock_command_fn(...) */ { f64 now; vm->time_offset = 0.0; now = vlib_time_now (vm); vm->time_offset = vlib_global_main.time_last_barrier_release - now; vm->time_last_barrier_release = vlib_time_now (vm); } if (CLIB_DEBUG > 0) vm->parked_at_barrier = 0; /* sync已经结束,将等待线程数减掉1 */ clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1); ...... }}
线程互斥机制使用示例
我们以命令“ set interface rx-placement”的主要函数:vnet_hw_interface_assign_rx_thread为例进行展示:
main线程
/* main线程收到命令后,最终会调用该函数 */voidvnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index, u16 queue_id, uword thread_index){ vnet_device_main_t *vdm = &vnet_device_main; vlib_main_t *vm, *vm0; vnet_device_input_runtime_t *rt; vnet_device_and_queue_t *dq; vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index); ASSERT (hw->input_node_index > 0); if (vdm->first_worker_thread_index == 0) thread_index = 0; if (thread_index != 0 && (thread_index < vdm->first_worker_thread_index || thread_index > vdm->last_worker_thread_index)) { thread_index = vdm->next_worker_thread_index++; if (vdm->next_worker_thread_index > vdm->last_worker_thread_index) vdm->next_worker_thread_index = vdm->first_worker_thread_index; } vm = vlib_mains[thread_index]; vm0 = vlib_get_main ();/* 本线程,一般是主线程 */ /* 通知worker线程,开始sync */ vlib_worker_thread_barrier_sync (vm0); rt = vlib_node_get_runtime_data (vm, hw->input_node_index); vec_add2 (rt->devices_and_queues, dq, 1); dq->hw_if_index = hw_if_index; dq->dev_instance = hw->dev_instance; dq->queue_id = queue_id; dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING; rt->enabled_node_state = VLIB_NODE_STATE_POLLING; vnet_device_queue_update (vnm, rt); vec_validate (hw->input_node_thread_index_by_queue, queue_id); vec_validate (hw->rx_mode_by_queue, queue_id); hw->input_node_thread_index_by_queue[queue_id] = thread_index; hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING; /* 通知worker线程,sync结束 */ vlib_worker_thread_barrier_release (vm0); vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);}
work线程
/* 参数is_main决定是主线程还是worker线程 */static_always_inline voidvlib_main_or_worker_loop (vlib_main_t * vm, int is_main){ ...... while (1) { vlib_node_runtime_t *n; /* 存在需要处理的rpc请求,处理 */ if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0)) { if (!is_main)/* 只有work线程才会发送rpc请求 */ vl_api_send_pending_rpc_requests (vm); } if (!is_main)/* worker线程 */ { /* 与main线程进行互斥,如果main线程进入了临界区的话,自旋等待 */ vlib_worker_thread_barrier_check (); ...... } ...... vlib_increment_main_loop_counter (vm); /* Record time stamp in case there are no enabled nodes and above calls do not update time stamp. */ cpu_time_now = clib_cpu_time_now (); }}
vpp_worker线程通知vpp_main线程的处理数据机制-RPC
VPP的rpc机制通过API机制实现的,在api机制中注册了两个api:
#define foreach_rpc_api_msg \_(RPC_CALL,rpc_call) \_(RPC_CALL_REPLY,rpc_call_reply)
数据结构
- [ ] vlib_main_t
typedef struct vlib_main_t{ ...... /* RPC requests, main thread only */ uword *pending_rpc_requests; /* 线程准备发送给vpp_main线程处理的rpc */ uword *processing_rpc_requests; /* vpp_main线程正在处理的rpc数组 */ clib_spinlock_t pending_rpc_lock; /* 保护上面两个数组的自旋锁 */} vlib_main_t;
- [ ] vl_api_rpc_call_t
rpc的api传递的请求消息
#ifndef _vl_api_defined_rpc_call#define _vl_api_defined_rpc_calltypedef VL_API_PACKED(struct _vl_api_rpc_call { u16 _vl_msg_id;/* 消息id */ u32 client_index;/* 不需要该索引,因为这个api是内部的 */ u32 context; u64 function;/* rpc函数 */ u8 multicast; u8 need_barrier_sync;/* 是否需要进行互斥保护 */ u8 send_reply;/* 是否发送应答,一般不发送应答 */ u32 data_len; u8 data[0];}) vl_api_rpc_call_t;#endif
相关函数分析
RPC api执行函数
static voidvl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp){ vl_api_rpc_call_reply_t *rmp; int (*fp) (void *); i32 rv = 0; vlib_main_t *vm = vlib_get_main (); if (mp->function == 0)/* 用户的rpc函数为空,输出waring */ { rv = -1; clib_warning ("rpc NULL function pointer"); } else { if (mp->need_barrier_sync)/* 如果需要互斥,则进行保护 */ vlib_worker_thread_barrier_sync (vm); fp = uword_to_pointer (mp->function, int (*)(void *));/* 转换成函数地址 */ rv = fp (mp->data);/* 执行函数 */ if (mp->need_barrier_sync) vlib_worker_thread_barrier_release (vm); } if (mp->send_reply)/* 如果需要发送应答,则发送应答给客户端,一般不需要发送应答 */ { svm_queue_t *q = vl_api_client_index_to_input_queue (mp->client_index); if (q) { rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp)); rmp->_vl_msg_id = ntohs (VL_API_RPC_CALL_REPLY); rmp->context = mp->context; rmp->retval = rv; vl_msg_api_send_shmem (q, (u8 *) & rmp); } } if (mp->multicast) { clib_warning ("multicast not yet implemented..."); }}/* 应答处理函数,没有实现 */static voidvl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp){ clib_warning ("unimplemented");}
发起一次RPC
/* 通知main_thread线程执行我们的函数,通知者可以是worker线程也可以是main线程。** force_rpc:表示强制使用rpc模式,即不直接调用我们指定的函数,让对应的协程去执行** worker线程调用该函数时,必须设置为1。main线程可以设置也可以不设置*/always_inline voidvl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length, u8 force_rpc){ vl_api_rpc_call_t *mp; vlib_main_t *vm_global = &vlib_global_main; vlib_main_t *vm = vlib_get_main (); /* Main thread and not a forced RPC: call the function directly */ /* main线程没有设置force_rpc标志,那就直接执行,不放入协程 */ if ((force_rpc == 0) && (vlib_get_thread_index () == 0)) { void (*call_fp) (void *); vlib_worker_thread_barrier_sync (vm); call_fp = fp; call_fp (data); vlib_worker_thread_barrier_release (vm); return; } /* Otherwise, actually do an RPC */ /* 进行一次rpc,分配rpc通信消息结构,使用的是共享内存 */ mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length); clib_memset (mp, 0, sizeof (*mp)); clib_memcpy_fast (mp->data, data, data_length); /* 第一个成员必须是消息id,api机制需要这个。内嵌的消息,非插件api,不需要模块基础消息id。 */ mp->_vl_msg_id = ntohs (VL_API_RPC_CALL); mp->function = pointer_to_uword (fp); mp->need_barrier_sync = 1; /* Add to the pending vector. Thread 0 requires locking. */ /* main线程的pending_rpc_requests向量是临界区,需要进行保护 ** 其它线程pending_rpc_requests自己读占,不需要保护 */ if (vm == vm_global) clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); vec_add1 (vm->pending_rpc_requests, (uword) mp); if (vm == vm_global) clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);}/* * Check if called from worker threads. * If so, make rpc call of fp through shmem. * Otherwise, call fp directly */voidvl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length){ vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */ 0);}/* * Always make rpc call of fp through shmem, useful for calling from threads * not setup as worker threads, such as DPDK callback thread * 强制main线程通过共享内存进行rpc调用,不直接调用 */voidvl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length){ vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */ 1);}main线程中的协程还可以使用函数vlib_rpc_call_main_thread发起RPCvoid *rpc_call_main_thread_cb_fn;voidvlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size){ /* 全局函数指针,在初始化的时候其值被设置为vl_api_rpc_call_main_thread函数的地址 */ if (rpc_call_main_thread_cb_fn) { void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn; (*fp) (callback, args, arg_size); } else clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");}
worker线程将本线程发起的RPC转交给main线程
/* worker线程将收集的rpc请求从自己的pending_rpc_requests中转移到main线程的pending_rpc_requests */voidvl_api_send_pending_rpc_requests (vlib_main_t * vm){ vlib_main_t *vm_global = &vlib_global_main; ASSERT (vm != vm_global); clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock); vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests); vec_reset_length (vm->pending_rpc_requests); clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);}
- [ ] vlib_main_or_worker_loop
只有worker线程才需要将RPC请求转移到main线程。
/* 参数is_main决定是主线程还是worker线程 */static_always_inline voidvlib_main_or_worker_loop (vlib_main_t * vm, int is_main){ while (1) { vlib_node_runtime_t *n; /* woerk线程将本线程收集的rpc请求转交给main线程 */ if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0)) { if (!is_main)/* 只有work线程才会将自己发起的rpc请求转移到main线程 */ vl_api_send_pending_rpc_requests (vm); } ...... vlib_increment_main_loop_counter (vm); /* Record time stamp in case there are no enabled nodes and above calls do not update time stamp. */ cpu_time_now = clib_cpu_time_now (); }}
协程处理RPC
RPC处理是在协程"api-rx-from-ring",这个协程也是处理api的协程。
/* *INDENT-OFF* */VLIB_REGISTER_NODE (vl_api_clnt_node) ={ .function = vl_api_clnt_process, .type = VLIB_NODE_TYPE_PROCESS, .name = "api-rx-from-ring", .state = VLIB_NODE_STATE_DISABLED,};
协程主函数vl_api_clnt_process
static uwordvl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node, vlib_frame_t * f){ ...... /* $$$ pay attention to frame size, control CPU usage */ while (1) { /* * There's a reason for checking the queue before * sleeping. If the vlib application crashes, it's entirely * possible for a client to enqueue a connect request * during the process restart interval. * * Unless some force of physics causes the new incarnation * of the application to process the request, the client will * sit and wait for Godot... */ vector_rate = vlib_last_vector_length_per_node (vm); start_time = vlib_time_now (vm); while (1) { if (vl_mem_api_handle_rpc (vm, node)/* 执行协程请求 */ || vl_mem_api_handle_msg_main (vm, node))/* 执行api请求 */ { vm->api_queue_nonempty = 0; VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0); sleep_time = 20.0; break; } ...... } ...... } return 0;}intvl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node){ api_main_t *am = &api_main; int i; uword *tmp, mp; /* * Swap pending and processing vectors, then process the RPCs * Avoid deadlock conditions by construction. * 将等待处理的人rpc请求转移到局部变量tmp。避免临界时间太长。 */ clib_spinlock_lock_if_init (&vm->pending_rpc_lock); tmp = vm->processing_rpc_requests; vec_reset_length (tmp); vm->processing_rpc_requests = vm->pending_rpc_requests; vm->pending_rpc_requests = tmp; clib_spinlock_unlock_if_init (&vm->pending_rpc_lock); /* * RPCs are used to reflect function calls to thread 0 * when the underlying code is not thread-safe. * * Grabbing the thread barrier across a set of RPCs * greatly increases efficiency, and avoids * running afoul of the barrier sync holddown timer. * The barrier sync code supports recursive locking. * * We really need to rewrite RPC-based code... */ if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests))) { vl_msg_api_barrier_sync (); for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)/* 循环处理每一个rpc */ { mp = vm->processing_rpc_requests[i]; vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node); } vl_msg_api_barrier_release (); } return 0;}/* This is only to be called from a vlib/vnet app */voidvl_msg_api_handler_with_vm_node (api_main_t * am, void *the_msg, vlib_main_t * vm, vlib_node_runtime_t * node){ u16 id = ntohs (*((u16 *) the_msg));/* 获取消息id,传递的消息第一个成员就是消息id */ u8 *(*handler) (void *, void *, void *); u8 *(*print_fp) (void *, void *); ...... /* 根据消息id获取对应的执行函数,即VL_API_RPC_CALL对应的函数vl_api_rpc_call_t_handler */ if (id < vec_len (am->msg_handlers) && am->msg_handlers[id]) { handler = (void *) am->msg_handlers[id]; if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled)) vl_msg_api_trace (am, am->rx_trace, the_msg); if (PREDICT_FALSE (am->msg_print_flag)) { fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]); print_fp = (void *) am->msg_print_handlers[id]; if (print_fp == 0) { fformat (stdout, " [no registered print fn for msg %d]\n", id); } else { (*print_fp) (the_msg, vm); } } if (!am->is_mp_safe[id]) { vl_msg_api_barrier_trace_context (am->msg_names[id]); vl_msg_api_barrier_sync (); } /* 执行函数vl_api_rpc_call_t_handler */ (*handler) (the_msg, vm, node); if (!am->is_mp_safe[id]) vl_msg_api_barrier_release (); } else { clib_warning ("no handler for msg id %d", id); } /* * Special-case, so we can e.g. bounce messages off the vnet * main thread without copying them... */ if (!(am->message_bounce[id])) vl_msg_api_free (the_msg); ......}