sonic处理netlink事件

sonic在处理路由,接口up/down,接口地址变化,team等事件上极大的依赖内核。sonic通过监听rtnl事件来响应linux事件。从而感知相关信息变化。

libnl

sonic使用libnl库来操作netlink事件,详细内容可以访问http://www.infradead.org/~tgr...。sonic在libnl库基础上封装了类NetLink进行netlink操作。

NetLink

class NetLink : public Selectable {public:    NetLink(int pri = 0);    ~NetLink() override;    void registerGroup(int rtnlGroup);//注册想要监听的事件,加入组播组    void dumpRequest(int rtmGetCommand);    int getFd() override;//判断句柄是否在select的可用事件中    void readData() override;//获取socket中的信息,触发回调函数private:    static int onNetlinkMsg(struct nl_msg *msg, void *arg);//回调函数    nl_sock *m_socket;//套接口描述符};

NetLink::NetLink(int pri) :

NetLink::NetLink(int pri) :    Selectable(pri), m_socket(NULL){    m_socket = nl_socket_alloc();//申请描述符    if (!m_socket)    {        SWSS_LOG_ERROR("Unable to allocated netlink socket");        throw system_error(make_error_code(errc::address_not_available),                           "Unable to allocated netlink socket");    }    nl_socket_disable_seq_check(m_socket);//不进行序列号检查    //注册回调函数,读取信息时,会自动回调该函数onNetlinkMsg    nl_socket_modify_cb(m_socket, NL_CB_VALID, NL_CB_CUSTOM, onNetlinkMsg, this);    //连接内核netlink    int err = nl_connect(m_socket, NETLINK_ROUTE);    if (err < 0)    {        SWSS_LOG_ERROR("Unable to connect netlink socket: %s", nl_geterror(err));        nl_socket_free(m_socket);        m_socket = NULL;        throw system_error(make_error_code(errc::address_not_available),                           "Unable to connect netlink socket");    }    //非阻塞    nl_socket_set_nonblocking(m_socket);    /* Set socket buffer size to 256KB */    nl_socket_set_buffer_size(m_socket, 2097152, 0);}

void NetLink::registerGroup(int rtnlGroup)

void NetLink::registerGroup(int rtnlGroup){    int err = nl_socket_add_membership(m_socket, rtnlGroup);//加入组播组    if (err < 0)    {        SWSS_LOG_ERROR("Unable to register to group %d: %s", rtnlGroup,                       nl_geterror(err));        throw system_error(make_error_code(errc::address_not_available),                           "Unable to register group");    }}

int NetLink::getFd()

int NetLink::getFd()//获取套接口句柄{    return nl_socket_get_fd(m_socket);}

void NetLink::readData()

void NetLink::readData(){    int err;    do    {        err = nl_recvmsgs_default(m_socket);//读取数据,有libnl触发回调函数,处理业务    }    while(err == -NLE_INTR); // Retry if the process was interrupted by a signal    if (err < 0)    {        if (err == -NLE_NOMEM)            SWSS_LOG_ERROR("netlink reports out of memory on reading a netlink socket. High possiblity of a lost message");        else if (err == -NLE_AGAIN)            SWSS_LOG_DEBUG("netlink reports NLE_AGAIN on reading a netlink socket");        else            SWSS_LOG_ERROR("netlink reports an error=%d on reading a netlink socket", err);    }}

int NetLink::onNetlinkMsg(......)

//回调函数,读取消息时回调该函数,该函数是一个消息分发器int NetLink::onNetlinkMsg(struct nl_msg *msg, void *arg){    NetDispatcher::getInstance().onNetlinkMessage(msg);    return NL_OK;}

void NetLink::dumpRequest(.....)

void NetLink::dumpRequest(int rtmGetCommand)//用于获取信息,实现get命令,查看内核相关信息{    int err = nl_rtgen_request(m_socket, rtmGetCommand, AF_UNSPEC, NLM_F_DUMP);    if (err < 0)    {        SWSS_LOG_ERROR("Unable to request dump on group %d: %s", rtmGetCommand,                       nl_geterror(err));        throw system_error(make_error_code(errc::address_not_available),                           "Unable to request dump");    }}

消息分发器

class NetDispatcher {public:    /**/    static NetDispatcher& getInstance();//获取消息分发器实例,消息分发器全局一个,静态函数    /*     * Register callback class according to message-type.     *     * Throw exception if,注册消息处理函数     */    void registerMessageHandler(int nlmsg_type, NetMsg *callback);    /*     * Called by NetLink or FpmLink classes as indication of new packet arrival     * 给netlink的回调函数     */    void onNetlinkMessage(struct nl_msg *msg);private:    NetDispatcher() = default;    /* nl_msg_parse callback API */    static void nlCallback(struct nl_object *obj, void *context);    std::map<int, NetMsg * > m_handlers;//回调函数存储map};class NetMsg {public:    /* Called by NetDispatcher when netmsg matches filters */    virtual void onMsg(int nlmsg_type, struct nl_object *obj) = 0;};}

NetDispatcher& NetDispatcher::getInstance()

NetDispatcher& NetDispatcher::getInstance()//消息分发器实例获取函数{    static NetDispatcher gInstance;//定义了一个静态分发器,全局一个    return gInstance;}

void NetDispatcher::registerMessageHandler()

void NetDispatcher::registerMessageHandler(int nlmsg_type, NetMsg *callback)//注册回调函数{    if (m_handlers.find(nlmsg_type) != m_handlers.end())        throw "Trying to registered on already registerd netlink message";    m_handlers[nlmsg_type] = callback;}

void NetDispatcher::nlCallback()

void NetDispatcher::nlCallback(struct nl_object *obj, void *context){    NetMsg *callback = (NetMsg *)context;    callback->onMsg(nl_object_get_msgtype(obj), obj);}

void NetDispatcher::onNetlinkMessage()

void NetDispatcher::onNetlinkMessage(struct nl_msg *msg)//netlink回调函数的真实实现{    struct nlmsghdr *nlmsghdr = nlmsg_hdr(msg);//获取netlink消息头    auto callback = m_handlers.find(nlmsghdr->nlmsg_type);//获取消息类型对应的NetMsg描述结构    /* Drop not registered messages */    if (callback == m_handlers.end())//没有对应的消息处理函数        return;    //解析消息,调用NetDispatcher::nlCallback     nl_msg_parse(msg, NetDispatcher::nlCallback, (void *)(callback->second));}

使用实例

我们以接口管理为例进行说明。

实现NetMsg

class IntfSync : public NetMsg{public:    enum { MAX_ADDR_SIZE = 64 };    IntfSync(DBConnector *db);//连接数据库    virtual void onMsg(int nlmsg_type, struct nl_object *obj);private:    ProducerStateTable m_intfTable;};}//消息处理函数void IntfSync::onMsg(int nlmsg_type, struct nl_object *obj){    char addrStr[MAX_ADDR_SIZE + 1] = {0};    struct rtnl_addr *addr = (struct rtnl_addr *)obj;    string key;    string scope = "global";    string family;    //响应新地址,获取地址,删除地址三个信息    if ((nlmsg_type != RTM_NEWADDR) && (nlmsg_type != RTM_GETADDR) &&        (nlmsg_type != RTM_DELADDR))        return;    /* Don't sync local routes,不同步local地址信息 */    if (rtnl_addr_get_scope(addr) != RT_SCOPE_UNIVERSE)    {        scope = "local";        return;    }    if (rtnl_addr_get_family(addr) == AF_INET)        family = IPV4_NAME;    else if (rtnl_addr_get_family(addr) == AF_INET6)        family = IPV6_NAME;    else        // Not supported        return;    //获取接口名字以及地址,组合成key    key = LinkCache::getInstance().ifindexToName(rtnl_addr_get_ifindex(addr));    key+= ":";    nl_addr2str(rtnl_addr_get_local(addr), addrStr, MAX_ADDR_SIZE);    key+= addrStr;    if (nlmsg_type == RTM_DELADDR)//地址删除,删除key    {        m_intfTable.del(key);        return;    }    //添加key    std::vector<FieldValueTuple> fvVector;    FieldValueTuple f("family", family);    FieldValueTuple s("scope", scope);    fvVector.push_back(s);    fvVector.push_back(f);    m_intfTable.set(key, fvVector);}

实现main

int main(int argc, char **argv){    swss::Logger::linkToDbNative("intfsyncd");    DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);//连接APPL_DB    IntfSync sync(&db);//实例化netmsg    //订阅消息,加入组播组    NetDispatcher::getInstance().registerMessageHandler(RTM_NEWADDR, &sync);    NetDispatcher::getInstance().registerMessageHandler(RTM_DELADDR, &sync);    while (1)    {        try        {            NetLink netlink;            Select s;            netlink.registerGroup(RTNLGRP_IPV4_IFADDR);            netlink.registerGroup(RTNLGRP_IPV6_IFADDR);            cout << "Listens to interface messages..." << endl;            netlink.dumpRequest(RTM_GETADDR);//打印所有地址            s.addSelectable(&netlink);//加入select事件            while (true)            {                Selectable *temps;                s.select(&temps);//监听select事件            }        }        catch (const std::exception& e)        {            cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;            return 0;        }    }    return 1;}