乐趣区

sonic处理netlink事件

sonic 处理 netlink 事件

​ sonic 在处理路由,接口 up/down, 接口地址变化,team 等事件上极大的依赖内核。sonic 通过监听 rtnl 事件来响应 linux 事件。从而感知相关信息变化。

libnl

sonic 使用 libnl 库来操作 netlink 事件,详细内容可以访问 http://www.infradead.org/~tgr…。sonic 在 libnl 库基础上封装了类 NetLink 进行 netlink 操作。

NetLink

class NetLink : public Selectable {
public:
    NetLink(int pri = 0);
    ~NetLink() override;

    void registerGroup(int rtnlGroup);// 注册想要监听的事件,加入组播组
    void dumpRequest(int rtmGetCommand);

    int getFd() override;// 判断句柄是否在 select 的可用事件中
    void readData() override;// 获取 socket 中的信息,触发回调函数

private:
    static int onNetlinkMsg(struct nl_msg *msg, void *arg);// 回调函数

    nl_sock *m_socket;// 套接口描述符
};

NetLink::NetLink(int pri) :

NetLink::NetLink(int pri) :
    Selectable(pri), m_socket(NULL)
{m_socket = nl_socket_alloc();// 申请描述符
    if (!m_socket)
    {SWSS_LOG_ERROR("Unable to allocated netlink socket");
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to allocated netlink socket");
    }

    nl_socket_disable_seq_check(m_socket);// 不进行序列号检查
    // 注册回调函数,读取信息时,会自动回调该函数 onNetlinkMsg
    nl_socket_modify_cb(m_socket, NL_CB_VALID, NL_CB_CUSTOM, onNetlinkMsg, this);
    // 连接内核 netlink
    int err = nl_connect(m_socket, NETLINK_ROUTE);
    if (err < 0)
    {SWSS_LOG_ERROR("Unable to connect netlink socket: %s", nl_geterror(err));
        nl_socket_free(m_socket);
        m_socket = NULL;
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to connect netlink socket");
    }
    // 非阻塞
    nl_socket_set_nonblocking(m_socket);
    /* Set socket buffer size to 256KB */
    nl_socket_set_buffer_size(m_socket, 2097152, 0);
}

void NetLink::registerGroup(int rtnlGroup)

void NetLink::registerGroup(int rtnlGroup)
{int err = nl_socket_add_membership(m_socket, rtnlGroup);// 加入组播组
    if (err < 0)
    {
        SWSS_LOG_ERROR("Unable to register to group %d: %s", rtnlGroup,
                       nl_geterror(err));
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to register group");
    }
}

int NetLink::getFd()

int NetLink::getFd()// 获取套接口句柄
{return nl_socket_get_fd(m_socket);
}

void NetLink::readData()

void NetLink::readData()
{
    int err;

    do
    {err = nl_recvmsgs_default(m_socket);// 读取数据,有 libnl 触发回调函数,处理业务
    }
    while(err == -NLE_INTR); // Retry if the process was interrupted by a signal

    if (err < 0)
    {if (err == -NLE_NOMEM)
            SWSS_LOG_ERROR("netlink reports out of memory on reading a netlink socket. High possiblity of a lost message");
        else if (err == -NLE_AGAIN)
            SWSS_LOG_DEBUG("netlink reports NLE_AGAIN on reading a netlink socket");
        else
            SWSS_LOG_ERROR("netlink reports an error=%d on reading a netlink socket", err);
    }
}

int NetLink::onNetlinkMsg(……)

// 回调函数,读取消息时回调该函数,该函数是一个消息分发器
int NetLink::onNetlinkMsg(struct nl_msg *msg, void *arg)
{NetDispatcher::getInstance().onNetlinkMessage(msg);
    return NL_OK;
}

void NetLink::dumpRequest(…..)

void NetLink::dumpRequest(int rtmGetCommand)// 用于获取信息,实现 get 命令,查看内核相关信息
{int err = nl_rtgen_request(m_socket, rtmGetCommand, AF_UNSPEC, NLM_F_DUMP);
    if (err < 0)
    {
        SWSS_LOG_ERROR("Unable to request dump on group %d: %s", rtmGetCommand,
                       nl_geterror(err));
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to request dump");
    }
}

消息分发器

class NetDispatcher {
public:
    /**/
    static NetDispatcher& getInstance();// 获取消息分发器实例,消息分发器全局一个,静态函数

    /*
     * Register callback class according to message-type.
     *
     * Throw exception if,注册消息处理函数
     */
    void registerMessageHandler(int nlmsg_type, NetMsg *callback);

    /*
     * Called by NetLink or FpmLink classes as indication of new packet arrival
     * 给 netlink 的回调函数
     */
    void onNetlinkMessage(struct nl_msg *msg);

private:
    NetDispatcher() = default;

    /* nl_msg_parse callback API */
    static void nlCallback(struct nl_object *obj, void *context);

    std::map<int, NetMsg * > m_handlers;// 回调函数存储 map
};
class NetMsg {
public:
    /* Called by NetDispatcher when netmsg matches filters */
    virtual void onMsg(int nlmsg_type, struct nl_object *obj) = 0;
};

}

NetDispatcher& NetDispatcher::getInstance()

NetDispatcher& NetDispatcher::getInstance()// 消息分发器实例获取函数
{
    static NetDispatcher gInstance;// 定义了一个静态分发器,全局一个
    return gInstance;
}

void NetDispatcher::registerMessageHandler()

void NetDispatcher::registerMessageHandler(int nlmsg_type, NetMsg *callback)// 注册回调函数
{if (m_handlers.find(nlmsg_type) != m_handlers.end())
        throw "Trying to registered on already registerd netlink message";

    m_handlers[nlmsg_type] = callback;
}

void NetDispatcher::nlCallback()

void NetDispatcher::nlCallback(struct nl_object *obj, void *context)
{NetMsg *callback = (NetMsg *)context;
    callback->onMsg(nl_object_get_msgtype(obj), obj);
}

void NetDispatcher::onNetlinkMessage()

void NetDispatcher::onNetlinkMessage(struct nl_msg *msg)//netlink 回调函数的真实实现
{struct nlmsghdr *nlmsghdr = nlmsg_hdr(msg);// 获取 netlink 消息头
    auto callback = m_handlers.find(nlmsghdr->nlmsg_type);// 获取消息类型对应的 NetMsg 描述结构

    /* Drop not registered messages */
    if (callback == m_handlers.end())// 没有对应的消息处理函数
        return;
    // 解析消息,调用 NetDispatcher::nlCallback 
    nl_msg_parse(msg, NetDispatcher::nlCallback, (void *)(callback->second));
}

使用实例

我们以接口管理为例进行说明。

实现 NetMsg

class IntfSync : public NetMsg
{
public:
    enum {MAX_ADDR_SIZE = 64};

    IntfSync(DBConnector *db);// 连接数据库

    virtual void onMsg(int nlmsg_type, struct nl_object *obj);

private:
    ProducerStateTable m_intfTable;
};

}
// 消息处理函数
void IntfSync::onMsg(int nlmsg_type, struct nl_object *obj)
{char addrStr[MAX_ADDR_SIZE + 1] = {0};
    struct rtnl_addr *addr = (struct rtnl_addr *)obj;
    string key;
    string scope = "global";
    string family;
    // 响应新地址,获取地址,删除地址三个信息
    if ((nlmsg_type != RTM_NEWADDR) && (nlmsg_type != RTM_GETADDR) &&
        (nlmsg_type != RTM_DELADDR))
        return;

    /* Don't sync local routes,不同步 local 地址信息 */
    if (rtnl_addr_get_scope(addr) != RT_SCOPE_UNIVERSE)
    {
        scope = "local";
        return;
    }

    if (rtnl_addr_get_family(addr) == AF_INET)
        family = IPV4_NAME;
    else if (rtnl_addr_get_family(addr) == AF_INET6)
        family = IPV6_NAME;
    else
        // Not supported
        return;
    // 获取接口名字以及地址,组合成 key
    key = LinkCache::getInstance().ifindexToName(rtnl_addr_get_ifindex(addr));
    key+= ":";
    nl_addr2str(rtnl_addr_get_local(addr), addrStr, MAX_ADDR_SIZE);
    key+= addrStr;
    if (nlmsg_type == RTM_DELADDR)// 地址删除,删除 key
    {m_intfTable.del(key);
        return;
    }
    // 添加 key
    std::vector<FieldValueTuple> fvVector;
    FieldValueTuple f("family", family);
    FieldValueTuple s("scope", scope);
    fvVector.push_back(s);
    fvVector.push_back(f);
    m_intfTable.set(key, fvVector);
}

实现 main

int main(int argc, char **argv)
{swss::Logger::linkToDbNative("intfsyncd");
    DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);// 连接 APPL_DB
    IntfSync sync(&db);// 实例化 netmsg
    // 订阅消息,加入组播组
    NetDispatcher::getInstance().registerMessageHandler(RTM_NEWADDR, &sync);
    NetDispatcher::getInstance().registerMessageHandler(RTM_DELADDR, &sync);

    while (1)
    {
        try
        {
            NetLink netlink;
            Select s;

            netlink.registerGroup(RTNLGRP_IPV4_IFADDR);
            netlink.registerGroup(RTNLGRP_IPV6_IFADDR);
            cout << "Listens to interface messages..." << endl;
            netlink.dumpRequest(RTM_GETADDR);// 打印所有地址

            s.addSelectable(&netlink);// 加入 select 事件
            while (true)
            {
                Selectable *temps;
                s.select(&temps);// 监听 select 事件
            }
        }
        catch (const std::exception& e)
        {cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;
            return 0;
        }
    }

    return 1;
}
退出移动版