共计 6328 个字符,预计需要花费 16 分钟才能阅读完成。
sonic 处理 netlink 事件
sonic 在处理路由,接口 up/down, 接口地址变化,team 等事件上极大的依赖内核。sonic 通过监听 rtnl 事件来响应 linux 事件。从而感知相关信息变化。
libnl
sonic 使用 libnl 库来操作 netlink 事件,详细内容可以访问 http://www.infradead.org/~tgr…。sonic 在 libnl 库基础上封装了类 NetLink 进行 netlink 操作。
NetLink
class NetLink : public Selectable {
public:
NetLink(int pri = 0);
~NetLink() override;
void registerGroup(int rtnlGroup);// 注册想要监听的事件,加入组播组
void dumpRequest(int rtmGetCommand);
int getFd() override;// 判断句柄是否在 select 的可用事件中
void readData() override;// 获取 socket 中的信息,触发回调函数
private:
static int onNetlinkMsg(struct nl_msg *msg, void *arg);// 回调函数
nl_sock *m_socket;// 套接口描述符
};
NetLink::NetLink(int pri) :
NetLink::NetLink(int pri) :
Selectable(pri), m_socket(NULL)
{m_socket = nl_socket_alloc();// 申请描述符
if (!m_socket)
{SWSS_LOG_ERROR("Unable to allocated netlink socket");
throw system_error(make_error_code(errc::address_not_available),
"Unable to allocated netlink socket");
}
nl_socket_disable_seq_check(m_socket);// 不进行序列号检查
// 注册回调函数,读取信息时,会自动回调该函数 onNetlinkMsg
nl_socket_modify_cb(m_socket, NL_CB_VALID, NL_CB_CUSTOM, onNetlinkMsg, this);
// 连接内核 netlink
int err = nl_connect(m_socket, NETLINK_ROUTE);
if (err < 0)
{SWSS_LOG_ERROR("Unable to connect netlink socket: %s", nl_geterror(err));
nl_socket_free(m_socket);
m_socket = NULL;
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect netlink socket");
}
// 非阻塞
nl_socket_set_nonblocking(m_socket);
/* Set socket buffer size to 256KB */
nl_socket_set_buffer_size(m_socket, 2097152, 0);
}
void NetLink::registerGroup(int rtnlGroup)
void NetLink::registerGroup(int rtnlGroup)
{int err = nl_socket_add_membership(m_socket, rtnlGroup);// 加入组播组
if (err < 0)
{
SWSS_LOG_ERROR("Unable to register to group %d: %s", rtnlGroup,
nl_geterror(err));
throw system_error(make_error_code(errc::address_not_available),
"Unable to register group");
}
}
int NetLink::getFd()
int NetLink::getFd()// 获取套接口句柄
{return nl_socket_get_fd(m_socket);
}
void NetLink::readData()
void NetLink::readData()
{
int err;
do
{err = nl_recvmsgs_default(m_socket);// 读取数据,有 libnl 触发回调函数,处理业务
}
while(err == -NLE_INTR); // Retry if the process was interrupted by a signal
if (err < 0)
{if (err == -NLE_NOMEM)
SWSS_LOG_ERROR("netlink reports out of memory on reading a netlink socket. High possiblity of a lost message");
else if (err == -NLE_AGAIN)
SWSS_LOG_DEBUG("netlink reports NLE_AGAIN on reading a netlink socket");
else
SWSS_LOG_ERROR("netlink reports an error=%d on reading a netlink socket", err);
}
}
int NetLink::onNetlinkMsg(……)
// 回调函数,读取消息时回调该函数,该函数是一个消息分发器
int NetLink::onNetlinkMsg(struct nl_msg *msg, void *arg)
{NetDispatcher::getInstance().onNetlinkMessage(msg);
return NL_OK;
}
void NetLink::dumpRequest(…..)
void NetLink::dumpRequest(int rtmGetCommand)// 用于获取信息,实现 get 命令,查看内核相关信息
{int err = nl_rtgen_request(m_socket, rtmGetCommand, AF_UNSPEC, NLM_F_DUMP);
if (err < 0)
{
SWSS_LOG_ERROR("Unable to request dump on group %d: %s", rtmGetCommand,
nl_geterror(err));
throw system_error(make_error_code(errc::address_not_available),
"Unable to request dump");
}
}
消息分发器
class NetDispatcher {
public:
/**/
static NetDispatcher& getInstance();// 获取消息分发器实例,消息分发器全局一个,静态函数
/*
* Register callback class according to message-type.
*
* Throw exception if,注册消息处理函数
*/
void registerMessageHandler(int nlmsg_type, NetMsg *callback);
/*
* Called by NetLink or FpmLink classes as indication of new packet arrival
* 给 netlink 的回调函数
*/
void onNetlinkMessage(struct nl_msg *msg);
private:
NetDispatcher() = default;
/* nl_msg_parse callback API */
static void nlCallback(struct nl_object *obj, void *context);
std::map<int, NetMsg * > m_handlers;// 回调函数存储 map
};
class NetMsg {
public:
/* Called by NetDispatcher when netmsg matches filters */
virtual void onMsg(int nlmsg_type, struct nl_object *obj) = 0;
};
}
NetDispatcher& NetDispatcher::getInstance()
NetDispatcher& NetDispatcher::getInstance()// 消息分发器实例获取函数
{
static NetDispatcher gInstance;// 定义了一个静态分发器,全局一个
return gInstance;
}
void NetDispatcher::registerMessageHandler()
void NetDispatcher::registerMessageHandler(int nlmsg_type, NetMsg *callback)// 注册回调函数
{if (m_handlers.find(nlmsg_type) != m_handlers.end())
throw "Trying to registered on already registerd netlink message";
m_handlers[nlmsg_type] = callback;
}
void NetDispatcher::nlCallback()
void NetDispatcher::nlCallback(struct nl_object *obj, void *context)
{NetMsg *callback = (NetMsg *)context;
callback->onMsg(nl_object_get_msgtype(obj), obj);
}
void NetDispatcher::onNetlinkMessage()
void NetDispatcher::onNetlinkMessage(struct nl_msg *msg)//netlink 回调函数的真实实现
{struct nlmsghdr *nlmsghdr = nlmsg_hdr(msg);// 获取 netlink 消息头
auto callback = m_handlers.find(nlmsghdr->nlmsg_type);// 获取消息类型对应的 NetMsg 描述结构
/* Drop not registered messages */
if (callback == m_handlers.end())// 没有对应的消息处理函数
return;
// 解析消息,调用 NetDispatcher::nlCallback
nl_msg_parse(msg, NetDispatcher::nlCallback, (void *)(callback->second));
}
使用实例
我们以接口管理为例进行说明。
实现 NetMsg
class IntfSync : public NetMsg
{
public:
enum {MAX_ADDR_SIZE = 64};
IntfSync(DBConnector *db);// 连接数据库
virtual void onMsg(int nlmsg_type, struct nl_object *obj);
private:
ProducerStateTable m_intfTable;
};
}
// 消息处理函数
void IntfSync::onMsg(int nlmsg_type, struct nl_object *obj)
{char addrStr[MAX_ADDR_SIZE + 1] = {0};
struct rtnl_addr *addr = (struct rtnl_addr *)obj;
string key;
string scope = "global";
string family;
// 响应新地址,获取地址,删除地址三个信息
if ((nlmsg_type != RTM_NEWADDR) && (nlmsg_type != RTM_GETADDR) &&
(nlmsg_type != RTM_DELADDR))
return;
/* Don't sync local routes,不同步 local 地址信息 */
if (rtnl_addr_get_scope(addr) != RT_SCOPE_UNIVERSE)
{
scope = "local";
return;
}
if (rtnl_addr_get_family(addr) == AF_INET)
family = IPV4_NAME;
else if (rtnl_addr_get_family(addr) == AF_INET6)
family = IPV6_NAME;
else
// Not supported
return;
// 获取接口名字以及地址,组合成 key
key = LinkCache::getInstance().ifindexToName(rtnl_addr_get_ifindex(addr));
key+= ":";
nl_addr2str(rtnl_addr_get_local(addr), addrStr, MAX_ADDR_SIZE);
key+= addrStr;
if (nlmsg_type == RTM_DELADDR)// 地址删除,删除 key
{m_intfTable.del(key);
return;
}
// 添加 key
std::vector<FieldValueTuple> fvVector;
FieldValueTuple f("family", family);
FieldValueTuple s("scope", scope);
fvVector.push_back(s);
fvVector.push_back(f);
m_intfTable.set(key, fvVector);
}
实现 main
int main(int argc, char **argv)
{swss::Logger::linkToDbNative("intfsyncd");
DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);// 连接 APPL_DB
IntfSync sync(&db);// 实例化 netmsg
// 订阅消息,加入组播组
NetDispatcher::getInstance().registerMessageHandler(RTM_NEWADDR, &sync);
NetDispatcher::getInstance().registerMessageHandler(RTM_DELADDR, &sync);
while (1)
{
try
{
NetLink netlink;
Select s;
netlink.registerGroup(RTNLGRP_IPV4_IFADDR);
netlink.registerGroup(RTNLGRP_IPV6_IFADDR);
cout << "Listens to interface messages..." << endl;
netlink.dumpRequest(RTM_GETADDR);// 打印所有地址
s.addSelectable(&netlink);// 加入 select 事件
while (true)
{
Selectable *temps;
s.select(&temps);// 监听 select 事件
}
}
catch (const std::exception& e)
{cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;
return 0;
}
}
return 1;
}
正文完