共计 15741 个字符,预计需要花费 40 分钟才能阅读完成。
VPP 是多线程模型,共享地址空间,最快的通信机制就是直接访问彼此之间的数据。VPP 自己实现了一套简单的线程安全机制,用于保护临界区。
VPP 多线程之间同步采用的是类似于带信号和超时机制的自旋锁,主要有 check、sync、release 操作。
总体上类似于 pthread_cond_timedwait 中的互斥体改成自旋锁所提供的功能,超过 BARRIER_SYNC_TIMEOUT 时间的话说明可能发生死锁故直接 abort。
其中:
- [] vlib_worker_thread_barrier_check 类似于 pthread_cond_wait 操作,等待 vlib_worker_threads->wait_at_barrier 条件。
- [] vlib_worker_thread_barrier_sync 类似于 spin_lock 操作,置位 vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程准备同步。
- [] vlib_worker_thread_barrier_release 类似于 spin_unlock 操作,复位 vlib_worker_threads->workers_at_barrier。只有主线程可以调用该函数,通知其它线程同步结束。
vpp_main 线程访问 vpp_worker 线程的数据的保护机制
数据结构
vlib_worker_thread_t
typedef struct
{
......
volatile u32 *wait_at_barrier;/* 通知 work 线程开始等待 sync 标志,main 线程开启 sync,设置为 1,结束设置为 0 */
volatile u32 *workers_at_barrier;/* 统计已经进入 sync 的 worker 线程的个数,由 worker 线程加 1 */
i64 recursion_level;/* 当前递归深度 */
u64 barrier_sync_count;/* 当前多少个线程已经同步了,当该值等于 work 线程数时,开始执行临界区操作 */
u8 barrier_elog_enabled;
const char *barrier_caller;/* 开启本次 sync 的函数名字 */
const char *barrier_context;
} vlib_worker_thread_t;
vlib_main_t
typedef struct vlib_main_t
{
......
/* debugging */
volatile int parked_at_barrier;
/*
* Barrier epoch - Set to current time, each time barrier_sync or
* barrier_release is called with zero recursion.
* 用于计算 sync 持续时间
*/
f64 barrier_epoch;
/* Earliest barrier can be closed again */
/* 当前时间小于 barrier_no_close_before,不允许启动 sync */
f64 barrier_no_close_before;
......
} vlib_main_t;
相关函数分析
- [] vlib_worker_thread_barrier_sync
main 线程调用该函数通知 worker 线程开始 sync,等待所有 worker 线程进入 sync 状态后,执行临界操作。
#define vlib_worker_thread_barrier_sync(X) {vlib_worker_thread_barrier_sync_int(X, __FUNCTION__);}
void
vlib_worker_thread_barrier_sync_int (vlib_main_t * vm, const char *func_name)
{
f64 deadline;
f64 now;
f64 t_entry;
f64 t_open;
f64 t_closed;
u32 count;
if (vec_len (vlib_mains) < 2)
return;
/* 只有主线程能够调用该函数 */
ASSERT (vlib_get_thread_index () == 0);
/* vlib_worker_threads[0] 为主线程,记录调用该函数的名字 */
vlib_worker_threads[0].barrier_caller = func_name;
count = vec_len (vlib_mains) - 1;/* 工作线程个数 */
/* Record entry relative to last close */
now = vlib_time_now (vm);
t_entry = now - vm->barrier_epoch;
/* Tolerate recursive calls,递归深度,非首次调用直接返回 */
if (++vlib_worker_threads[0].recursion_level > 1)
{barrier_trace_sync_rec (t_entry);
return;
}
/* 发起 sync 次数统计 */
vlib_worker_threads[0].barrier_sync_count++;
/* Enforce minimum barrier open time to minimize packet loss */
/* 再次发起 sync,必须在禁止其外,每次 sync 完成后,在指定时间内不能发起第二次 sync */
ASSERT (vm->barrier_no_close_before <= (now + BARRIER_MINIMUM_OPEN_LIMIT));
while (1)
{now = vlib_time_now (vm);
/* Barrier hold-down timer expired? */
if (now >= vm->barrier_no_close_before)
break;
if ((vm->barrier_no_close_before - now)
> (2.0 * BARRIER_MINIMUM_OPEN_LIMIT))
{
clib_warning ("clock change: would have waited for %.4f seconds",
(vm->barrier_no_close_before - now));
break;
}
}
/* Record time of closure */
/* 两次启动 sync 的间隔时间,即 open 时间 */
t_open = now - vm->barrier_epoch;
vm->barrier_epoch = now;
/* 最大时间,debug 版本下 600 秒,其它情况下 1 秒 */
deadline = now + BARRIER_SYNC_TIMEOUT;
/* 设置 wait_at_barrier 值为 1,通知 worker */
*vlib_worker_threads->wait_at_barrier = 1;
/* 等待所有的工作者线程就绪 */
while (*vlib_worker_threads->workers_at_barrier != count)
{
/* 超时直接打印 os panic */
if ((now = vlib_time_now (vm)) > deadline)
{fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
os_panic ();}
}
/* 从开始启动 sync 过程到所有 work 线程接受 sync 的时间 */
t_closed = now - vm->barrier_epoch;
barrier_trace_sync (t_entry, t_open, t_closed);
}
- [] vlib_worker_thread_barrier_release
main 线程处理完临界区操作后,调用该函数通知 worker 线程 sync 过程结束。
/* sync 过程结束函数 */
void
vlib_worker_thread_barrier_release (vlib_main_t * vm)
{
f64 deadline;
f64 now;
f64 minimum_open;
f64 t_entry;
f64 t_closed_total;
f64 t_update_main = 0.0;
int refork_needed = 0;
if (vec_len (vlib_mains) < 2)
return;
ASSERT (vlib_get_thread_index () == 0);
now = vlib_time_now (vm);
/* 一对 sync 与 release 调用时间段 */
t_entry = now - vm->barrier_epoch;
/* 减少递归深度,如果大于 0 表示 sync 还没结束 */
if (--vlib_worker_threads[0].recursion_level > 0)
{barrier_trace_release_rec (t_entry);
return;
}
......
deadline = now + BARRIER_SYNC_TIMEOUT;
/*
* Note when we let go of the barrier.
* Workers can use this to derive a reasonably accurate
* time offset. See vlib_time_now(...)
*/
vm->time_last_barrier_release = vlib_time_now (vm);
CLIB_MEMORY_STORE_BARRIER ();
/* 清除等待标志 */
*vlib_worker_threads->wait_at_barrier = 0;
/* 等待所有的 works 线程退出 */
while (*vlib_worker_threads->workers_at_barrier > 0)
{
/* 时间太长,打印 panic */
if ((now = vlib_time_now (vm)) > deadline)
{fformat (stderr, "%s: worker thread deadlock\n", __FUNCTION__);
os_panic ();}
}
......
/* 整个 sync 持续时间 */
t_closed_total = now - vm->barrier_epoch;
/* 计算下一次 sync 最少需要休息多久才能启动,与本次 sync 耗时正相关 */
minimum_open = t_closed_total * BARRIER_MINIMUM_OPEN_FACTOR;
if (minimum_open > BARRIER_MINIMUM_OPEN_LIMIT)
{minimum_open = BARRIER_MINIMUM_OPEN_LIMIT;}
/* 设置下次 sync 的最早时间 */
vm->barrier_no_close_before = now + minimum_open;
/* Record barrier epoch (used to enforce minimum open time) */
/* 更新 epoch 时间 */
vm->barrier_epoch = now;
barrier_trace_release (t_entry, t_closed_total, t_update_main);
}
vlib_worker_thread_barrier_sync 和 vlib_worker_thread_barrier_release 函数只能由 main 线程成对使用,可以支持嵌套调用。用于实现 main 线程访问 worker 线程的数据,效率较差。
- [] vlib_worker_thread_barrier_check*
vpp_main 线程启动 sync 后,worker 线程需要调用该函数等待。
static inline void
vlib_worker_thread_barrier_check (void)
{
/* 如果 main 线程已经启动了 sync 过程,则本线程需要进入 sync 状态 */
if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier))
{vlib_main_t *vm = vlib_get_main ();
u32 thread_index = vm->thread_index;
f64 t = vlib_time_now (vm);
......
/* 等待线程数加 1 */
clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
if (CLIB_DEBUG > 0)
{vm = vlib_get_main ();
vm->parked_at_barrier = 1;
}
/* 自旋等待 sync 结束 */
while (*vlib_worker_threads->wait_at_barrier);
/*
* Recompute the offset from thread-0 time.
* Note that vlib_time_now adds vm->time_offset, so
* clear it first. Save the resulting idea of "now", to
* see how well we're doing. See show_clock_command_fn(...)
*/
{
f64 now;
vm->time_offset = 0.0;
now = vlib_time_now (vm);
vm->time_offset = vlib_global_main.time_last_barrier_release - now;
vm->time_last_barrier_release = vlib_time_now (vm);
}
if (CLIB_DEBUG > 0)
vm->parked_at_barrier = 0;
/* sync 已经结束,将等待线程数减掉 1 */
clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
......
}
}
线程互斥机制使用示例
我们以命令“set interface rx-placement”的主要函数:vnet_hw_interface_assign_rx_thread 为例进行展示:
main 线程
/* main 线程收到命令后,最终会调用该函数 */
void
vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index,
u16 queue_id, uword thread_index)
{
vnet_device_main_t *vdm = &vnet_device_main;
vlib_main_t *vm, *vm0;
vnet_device_input_runtime_t *rt;
vnet_device_and_queue_t *dq;
vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index);
ASSERT (hw->input_node_index > 0);
if (vdm->first_worker_thread_index == 0)
thread_index = 0;
if (thread_index != 0 &&
(thread_index < vdm->first_worker_thread_index ||
thread_index > vdm->last_worker_thread_index))
{
thread_index = vdm->next_worker_thread_index++;
if (vdm->next_worker_thread_index > vdm->last_worker_thread_index)
vdm->next_worker_thread_index = vdm->first_worker_thread_index;
}
vm = vlib_mains[thread_index];
vm0 = vlib_get_main ();/* 本线程,一般是主线程 */
/* 通知 worker 线程,开始 sync */
vlib_worker_thread_barrier_sync (vm0);
rt = vlib_node_get_runtime_data (vm, hw->input_node_index);
vec_add2 (rt->devices_and_queues, dq, 1);
dq->hw_if_index = hw_if_index;
dq->dev_instance = hw->dev_instance;
dq->queue_id = queue_id;
dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;
rt->enabled_node_state = VLIB_NODE_STATE_POLLING;
vnet_device_queue_update (vnm, rt);
vec_validate (hw->input_node_thread_index_by_queue, queue_id);
vec_validate (hw->rx_mode_by_queue, queue_id);
hw->input_node_thread_index_by_queue[queue_id] = thread_index;
hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING;
/* 通知 worker 线程,sync 结束 */
vlib_worker_thread_barrier_release (vm0);
vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);
}
work 线程
/* 参数 is_main 决定是主线程还是 worker 线程 */
static_always_inline void
vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
{
......
while (1)
{
vlib_node_runtime_t *n;
/* 存在需要处理的 rpc 请求,处理 */
if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
{if (!is_main)/* 只有 work 线程才会发送 rpc 请求 */
vl_api_send_pending_rpc_requests (vm);
}
if (!is_main)/* worker 线程 */
{
/* 与 main 线程进行互斥,如果 main 线程进入了临界区的话,自旋等待 */
vlib_worker_thread_barrier_check ();
......
}
......
vlib_increment_main_loop_counter (vm);
/* Record time stamp in case there are no enabled nodes and above
calls do not update time stamp. */
cpu_time_now = clib_cpu_time_now ();}
}
vpp_worker 线程通知 vpp_main 线程的处理数据机制 -RPC
VPP 的 rpc 机制通过 API 机制实现的,在 api 机制中注册了两个 api:
#define foreach_rpc_api_msg \
_(RPC_CALL,rpc_call) \
_(RPC_CALL_REPLY,rpc_call_reply)
数据结构
- [] vlib_main_t
typedef struct vlib_main_t
{
......
/* RPC requests, main thread only */
uword *pending_rpc_requests; /* 线程准备发送给 vpp_main 线程处理的 rpc */
uword *processing_rpc_requests; /* vpp_main 线程正在处理的 rpc 数组 */
clib_spinlock_t pending_rpc_lock; /* 保护上面两个数组的自旋锁 */
} vlib_main_t;
- [] vl_api_rpc_call_t
rpc 的 api 传递的请求消息
#ifndef _vl_api_defined_rpc_call
#define _vl_api_defined_rpc_call
typedef VL_API_PACKED(struct _vl_api_rpc_call {
u16 _vl_msg_id;/* 消息 id */
u32 client_index;/* 不需要该索引,因为这个 api 是内部的 */
u32 context;
u64 function;/* rpc 函数 */
u8 multicast;
u8 need_barrier_sync;/* 是否需要进行互斥保护 */
u8 send_reply;/* 是否发送应答,一般不发送应答 */
u32 data_len;
u8 data[0];
}) vl_api_rpc_call_t;
#endif
相关函数分析
RPC api 执行函数
static void
vl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp)
{
vl_api_rpc_call_reply_t *rmp;
int (*fp) (void *);
i32 rv = 0;
vlib_main_t *vm = vlib_get_main ();
if (mp->function == 0)/* 用户的 rpc 函数为空,输出 waring */
{
rv = -1;
clib_warning ("rpc NULL function pointer");
}
else
{if (mp->need_barrier_sync)/* 如果需要互斥,则进行保护 */
vlib_worker_thread_barrier_sync (vm);
fp = uword_to_pointer (mp->function, int (*)(void *));/* 转换成函数地址 */
rv = fp (mp->data);/* 执行函数 */
if (mp->need_barrier_sync)
vlib_worker_thread_barrier_release (vm);
}
if (mp->send_reply)/* 如果需要发送应答,则发送应答给客户端,一般不需要发送应答 */
{svm_queue_t *q = vl_api_client_index_to_input_queue (mp->client_index);
if (q)
{rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
rmp->_vl_msg_id = ntohs (VL_API_RPC_CALL_REPLY);
rmp->context = mp->context;
rmp->retval = rv;
vl_msg_api_send_shmem (q, (u8 *) & rmp);
}
}
if (mp->multicast)
{clib_warning ("multicast not yet implemented...");
}
}
/* 应答处理函数,没有实现 */
static void
vl_api_rpc_call_reply_t_handler (vl_api_rpc_call_reply_t * mp)
{clib_warning ("unimplemented");
}
发起一次 RPC
/* 通知 main_thread 线程执行我们的函数,通知者可以是 worker 线程也可以是 main 线程。** force_rpc:表示强制使用 rpc 模式,即不直接调用我们指定的函数,让对应的协程去执行
** worker 线程调用该函数时,必须设置为 1。main 线程可以设置也可以不设置
*/
always_inline void
vl_api_rpc_call_main_thread_inline (void *fp, u8 * data, u32 data_length,
u8 force_rpc)
{
vl_api_rpc_call_t *mp;
vlib_main_t *vm_global = &vlib_global_main;
vlib_main_t *vm = vlib_get_main ();
/* Main thread and not a forced RPC: call the function directly */
/* main 线程没有设置 force_rpc 标志,那就直接执行,不放入协程 */
if ((force_rpc == 0) && (vlib_get_thread_index () == 0))
{void (*call_fp) (void *);
vlib_worker_thread_barrier_sync (vm);
call_fp = fp;
call_fp (data);
vlib_worker_thread_barrier_release (vm);
return;
}
/* Otherwise, actually do an RPC */
/* 进行一次 rpc,分配 rpc 通信消息结构,使用的是共享内存 */
mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length);
clib_memset (mp, 0, sizeof (*mp));
clib_memcpy_fast (mp->data, data, data_length);
/* 第一个成员必须是消息 id,api 机制需要这个。内嵌的消息,非插件 api,不需要模块基础消息 id。*/
mp->_vl_msg_id = ntohs (VL_API_RPC_CALL);
mp->function = pointer_to_uword (fp);
mp->need_barrier_sync = 1;
/* Add to the pending vector. Thread 0 requires locking. */
/* main 线程的 pending_rpc_requests 向量是临界区,需要进行保护
** 其它线程 pending_rpc_requests 自己读占,不需要保护
*/
if (vm == vm_global)
clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
vec_add1 (vm->pending_rpc_requests, (uword) mp);
if (vm == vm_global)
clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
}
/*
* Check if called from worker threads.
* If so, make rpc call of fp through shmem.
* Otherwise, call fp directly
*/
void
vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
{
vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */
0);
}
/*
* Always make rpc call of fp through shmem, useful for calling from threads
* not setup as worker threads, such as DPDK callback thread
* 强制 main 线程通过共享内存进行 rpc 调用,不直接调用
*/
void
vl_api_force_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
{
vl_api_rpc_call_main_thread_inline (fp, data, data_length, /*force_rpc */
1);
}
main 线程中的协程还可以使用函数 vlib_rpc_call_main_thread 发起 RPC
void *rpc_call_main_thread_cb_fn;
void
vlib_rpc_call_main_thread (void *callback, u8 * args, u32 arg_size)
{
/* 全局函数指针,在初始化的时候其值被设置为 vl_api_rpc_call_main_thread 函数的地址 */
if (rpc_call_main_thread_cb_fn)
{void (*fp) (void *, u8 *, u32) = rpc_call_main_thread_cb_fn;
(*fp) (callback, args, arg_size);
}
else
clib_warning ("BUG: rpc_call_main_thread_cb_fn NULL!");
}
worker 线程将本线程发起的 RPC 转交给 main 线程
/* worker 线程将收集的 rpc 请求从自己的 pending_rpc_requests 中转移到 main 线程的 pending_rpc_requests */
void
vl_api_send_pending_rpc_requests (vlib_main_t * vm)
{
vlib_main_t *vm_global = &vlib_global_main;
ASSERT (vm != vm_global);
clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
vec_reset_length (vm->pending_rpc_requests);
clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
}
- [] vlib_main_or_worker_loop
只有 worker 线程才需要将 RPC 请求转移到 main 线程。
/* 参数 is_main 决定是主线程还是 worker 线程 */
static_always_inline void
vlib_main_or_worker_loop (vlib_main_t * vm, int is_main)
{while (1)
{
vlib_node_runtime_t *n;
/* woerk 线程将本线程收集的 rpc 请求转交给 main 线程 */
if (PREDICT_FALSE (_vec_len (vm->pending_rpc_requests) > 0))
{if (!is_main)/* 只有 work 线程才会将自己发起的 rpc 请求转移到 main 线程 */
vl_api_send_pending_rpc_requests (vm);
}
......
vlib_increment_main_loop_counter (vm);
/* Record time stamp in case there are no enabled nodes and above
calls do not update time stamp. */
cpu_time_now = clib_cpu_time_now ();}
}
协程处理 RPC
RPC 处理是在协程 ”api-rx-from-ring”,这个协程也是处理 api 的协程。
/* *INDENT-OFF* */
VLIB_REGISTER_NODE (vl_api_clnt_node) =
{
.function = vl_api_clnt_process,
.type = VLIB_NODE_TYPE_PROCESS,
.name = "api-rx-from-ring",
.state = VLIB_NODE_STATE_DISABLED,
};
协程主函数 vl_api_clnt_process
static uword
vl_api_clnt_process (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * f)
{
......
/* $$$ pay attention to frame size, control CPU usage */
while (1)
{
/*
* There's a reason for checking the queue before
* sleeping. If the vlib application crashes, it's entirely
* possible for a client to enqueue a connect request
* during the process restart interval.
*
* Unless some force of physics causes the new incarnation
* of the application to process the request, the client will
* sit and wait for Godot...
*/
vector_rate = vlib_last_vector_length_per_node (vm);
start_time = vlib_time_now (vm);
while (1)
{if (vl_mem_api_handle_rpc (vm, node)/* 执行协程请求 */
|| vl_mem_api_handle_msg_main (vm, node))/* 执行 api 请求 */
{
vm->api_queue_nonempty = 0;
VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
sleep_time = 20.0;
break;
}
......
}
......
}
return 0;
}
int
vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
{
api_main_t *am = &api_main;
int i;
uword *tmp, mp;
/*
* Swap pending and processing vectors, then process the RPCs
* Avoid deadlock conditions by construction.
* 将等待处理的人 rpc 请求转移到局部变量 tmp。避免临界时间太长。*/
clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
tmp = vm->processing_rpc_requests;
vec_reset_length (tmp);
vm->processing_rpc_requests = vm->pending_rpc_requests;
vm->pending_rpc_requests = tmp;
clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
/*
* RPCs are used to reflect function calls to thread 0
* when the underlying code is not thread-safe.
*
* Grabbing the thread barrier across a set of RPCs
* greatly increases efficiency, and avoids
* running afoul of the barrier sync holddown timer.
* The barrier sync code supports recursive locking.
*
* We really need to rewrite RPC-based code...
*/
if (PREDICT_TRUE (vec_len (vm->processing_rpc_requests)))
{vl_msg_api_barrier_sync ();
for (i = 0; i < vec_len (vm->processing_rpc_requests); i++)/* 循环处理每一个 rpc */
{mp = vm->processing_rpc_requests[i];
vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
}
vl_msg_api_barrier_release ();}
return 0;
}
/* This is only to be called from a vlib/vnet app */
void
vl_msg_api_handler_with_vm_node (api_main_t * am,
void *the_msg, vlib_main_t * vm,
vlib_node_runtime_t * node)
{u16 id = ntohs (*((u16 *) the_msg));/* 获取消息 id,传递的消息第一个成员就是消息 id */
u8 *(*handler) (void *, void *, void *);
u8 *(*print_fp) (void *, void *);
......
/* 根据消息 id 获取对应的执行函数,即 VL_API_RPC_CALL 对应的函数 vl_api_rpc_call_t_handler */
if (id < vec_len (am->msg_handlers) && am->msg_handlers[id])
{handler = (void *) am->msg_handlers[id];
if (PREDICT_FALSE (am->rx_trace && am->rx_trace->enabled))
vl_msg_api_trace (am, am->rx_trace, the_msg);
if (PREDICT_FALSE (am->msg_print_flag))
{fformat (stdout, "[%d]: %s\n", id, am->msg_names[id]);
print_fp = (void *) am->msg_print_handlers[id];
if (print_fp == 0)
{fformat (stdout, "[no registered print fn for msg %d]\n", id);
}
else
{(*print_fp) (the_msg, vm);
}
}
if (!am->is_mp_safe[id])
{vl_msg_api_barrier_trace_context (am->msg_names[id]);
vl_msg_api_barrier_sync ();}
/* 执行函数 vl_api_rpc_call_t_handler */
(*handler) (the_msg, vm, node);
if (!am->is_mp_safe[id])
vl_msg_api_barrier_release ();}
else
{clib_warning ("no handler for msg id %d", id);
}
/*
* Special-case, so we can e.g. bounce messages off the vnet
* main thread without copying them...
*/
if (!(am->message_bounce[id]))
vl_msg_api_free (the_msg);
......
}