sonic配置team与实现机制

sonic实现team代码框架图:

sonic修改lag模式配置步骤

1.修改文件teamd.j2

 docker exec -it teamd bash cd /usr/share/sonic/templates/ vim teamd.j2 

例如将动态模式改成静态模式:

源文件:

{    "device": "{{ pc }}",    "hwaddr": "{{ hwaddr }}",    "runner": {        "name": "lacp",        "active": true,{% if PORTCHANNEL[pc]['fallback'] and ((PORTCHANNEL[pc]['members'] | length) == 1) %}        "fallback": {{ PORTCHANNEL[pc]['fallback'] }},{% else %}{# Use 75% links upperbound as min-links #}        "min_ports": {{ (PORTCHANNEL[pc]['members'] | length * 0.75) | round(0, 'ceil') | int }},{% endif %}        "tx_hash": ["eth", "ipv4", "ipv6"]    },    "link_watch": {        "name": "ethtool"    },    "ports":  for member in PORTCHANNEL[pc]['members'] %}        "{{ member }}": {}{% if not loop.last %},{% endif %}{% endfor %}    }}

修改后的文件:

{    "device": "{{ pc }}",    "hwaddr": "{{ hwaddr }}",    "runner": {        "name": "roundrobin",        "active": true,{% if PORTCHANNEL[pc]['fallback'] and ((PORTCHANNEL[pc]['members'] | length) == 1) %}        "fallback": {{ PORTCHANNEL[pc]['fallback'] }},{% else %}{# Use 75% links upperbound as min-links #}        "min_ports": {{ (PORTCHANNEL[pc]['members'] | length * 0.75) | round(0, 'ceil') | int }},{% endif %}        "tx_hash": ["eth", "ipv4", "ipv6"]    },    "link_watch": {        "name": "ethtool"    },    "ports":  for member in PORTCHANNEL[pc]['members'] %}        "{{ member }}": {}{% if not loop.last %},{% endif %}{% endfor %}    }}

2.重新加载配置该步骤会重启docker-teamd

admin@switch2:~$ sudo config reload -yRunning command: sonic-cfggen -j /etc/sonic/config_db.json --write-to-dbRunning command: service hostname-config restartRunning command: service interfaces-config restartRunning command: service ntp-config restartRunning command: service rsyslog-config restartRunning command: service swss restartRunning command: service pmon restartRunning command: service teamd restart

LAG内核信息同步到APP_DB实现机制分析

sonic的team采用的是开源team项目,详细信息参考:https://github.com/jpirko/lib...

该部分有一个teamsyncd进程用于监听内核的team netlink信息,以及teamd的lag成员端口变化信息,将其同步到app_db.

该部分涉及文件:

teamsyncd.cpp

teamsync.cpp

teamsync.h

TeamSync

class TeamSync : public NetMsg{public:    TeamSync(DBConnector *db, DBConnector *stateDb, Select *select);    /*     * Listens to RTM_NEWLINK and RTM_DELLINK to undestand if there is a new     * team device     * lag变化信息处理回调函数     */    virtual void onMsg(int nlmsg_type, struct nl_object *obj);    class TeamPortSync : public Selectable//lag成员端口信息监听结构    {    public:        enum { MAX_IFNAME = 64 };        TeamPortSync(const std::string &lagName, int ifindex,                     ProducerStateTable *lagMemberTable);        ~TeamPortSync();        int getFd() override;        void readData() override;    protected:        int onChange();        static int teamdHandler(struct team_handle *th, void *arg,                                team_change_type_mask_t type_mask);        static const struct team_change_handler gPortChangeHandler;    private:        ProducerStateTable *m_lagMemberTable;        struct team_handle *m_team;//lag句柄,用于管理lag相关信息,主要是成员端口的管理        std::string m_lagName;        int m_ifindex;        //记录lag中的成员,进行新旧比对        std::map<std::string, bool> m_lagMembers; /* map[ifname] = status (enabled|disabled) */    };protected:    void addLag(const std::string &lagName, int ifindex, bool admin_state,                bool oper_state, unsigned int mtu);//添加lag函数    void removeLag(const std::string &lagName);//删除lag函数private:    Select *m_select;    ProducerStateTable m_lagTable;//lag数据库生产者    ProducerStateTable m_lagMemberTable;//lag成员数据库生产者    Table m_stateLagTable;//lag state 数据库    std::map<std::string, std::shared_ptr<TeamPortSync> > m_teamPorts;//每一个lag对应的成员端口监听对象};

具体函数

/* Taken from drivers/net/team/team.c */#define TEAM_DRV_NAME "team"TeamSync::TeamSync(DBConnector *db, DBConnector *stateDb, Select *select) :    m_select(select),    m_lagTable(db, APP_LAG_TABLE_NAME),//作为appdb的lag_table的生产者    m_lagMemberTable(db, APP_LAG_MEMBER_TABLE_NAME),//作为appdb的lag_member_table的生产者    m_stateLagTable(stateDb, STATE_LAG_TABLE_NAME)//写state表{}void TeamSync::onMsg(int nlmsg_type, struct nl_object *obj){    struct rtnl_link *link = (struct rtnl_link *)obj;    if ((nlmsg_type != RTM_NEWLINK) && (nlmsg_type != RTM_DELLINK))        return;    string lagName = rtnl_link_get_name(link);    /* Listens to LAG messages */    char *type = rtnl_link_get_type(link);    if (!type || (strcmp(type, TEAM_DRV_NAME) != 0))        return;    if (nlmsg_type == RTM_DELLINK)    {        /* Remove LAG ports and delete LAG */        removeLag(lagName);        return;    }    //lag状态变化都会走这里,都是使用RTM_NEWLINK事件通知的    addLag(lagName, rtnl_link_get_ifindex(link),           rtnl_link_get_flags(link) & IFF_UP,           rtnl_link_get_flags(link) & IFF_LOWER_UP,           rtnl_link_get_mtu(link));}void TeamSync::addLag(const string &lagName, int ifindex, bool admin_state,                      bool oper_state, unsigned int mtu){    /* Set the LAG */    std::vector<FieldValueTuple> fvVector;    FieldValueTuple a("admin_status", admin_state ? "up" : "down");    FieldValueTuple o("oper_status", oper_state ? "up" : "down");    FieldValueTuple m("mtu", to_string(mtu));    fvVector.push_back(a);    fvVector.push_back(o);    fvVector.push_back(m);    m_lagTable.set(lagName, fvVector);    SWSS_LOG_INFO("Add %s admin_status:%s oper_status:%s mtu:%d",                   lagName.c_str(), admin_state ? "up" : "down", oper_state ? "up" : "down", mtu);    /* Return when the team instance has already been tracked */    if (m_teamPorts.find(lagName) != m_teamPorts.end())        return;    /* Start track the team instance 新接口,启动一个套接口监听该lag的成员变化情况 */    auto sync = make_shared<TeamPortSync>(lagName, ifindex, &m_lagMemberTable);    m_select->addSelectable(sync.get());    m_teamPorts[lagName] = sync;    //在db6(state-db)设置该lag创建成功标志    fvVector.clear();    FieldValueTuple s("state", "ok");    fvVector.push_back(s);    m_stateLagTable.set(lagName, fvVector);}void TeamSync::removeLag(const string &lagName){    /* Delete the LAG */    m_lagTable.del(lagName);    SWSS_LOG_INFO("Remove %s", lagName.c_str());    /* Return when the team instance hasn't been tracked before */    if (m_teamPorts.find(lagName) == m_teamPorts.end())        return;    /* No longer track the current team instance */    m_select->removeSelectable(m_teamPorts[lagName].get());    m_teamPorts.erase(lagName);    m_stateLagTable.del(lagName);//移除成功标志}//lag成员端口变化处理函数const struct team_change_handler TeamSync::TeamPortSync::gPortChangeHandler = {    .func       = TeamSync::TeamPortSync::teamdHandler,    .type_mask  = TEAM_PORT_CHANGE | TEAM_OPTION_CHANGE};TeamSync::TeamPortSync::TeamPortSync(const string &lagName, int ifindex,                                     ProducerStateTable *lagMemberTable) :    m_lagMemberTable(lagMemberTable),    m_lagName(lagName),    m_ifindex(ifindex){    m_team = team_alloc();    if (!m_team)    {        SWSS_LOG_ERROR("Unable to allocated team socket");        throw system_error(make_error_code(errc::address_not_available),                           "Unable to allocated team socket");    }    //libteam初始化函数,该函数进行了大量的回调函数的注册,会自动获取lag中所有的端口到port_list成员列表中    int err = team_init(m_team, ifindex);    if (err) {        team_free(m_team);        m_team = NULL;        SWSS_LOG_ERROR("Unable to init team socket");        throw system_error(make_error_code(errc::address_not_available),                           "Unable to init team socket");    }    //注册端口变化处理函数,端口信息发生变化后调用gPortChangeHandler    err = team_change_handler_register(m_team, &gPortChangeHandler, this);    if (err) {        team_free(m_team);        m_team = NULL;        SWSS_LOG_ERROR("Unable to register port change event");        throw system_error(make_error_code(errc::address_not_available),                           "Unable to register port change event");    }    /* Sync LAG at first */    onChange();}TeamSync::TeamPortSync::~TeamPortSync(){    if (m_team)    {        team_change_handler_unregister(m_team, &gPortChangeHandler, this);        team_free(m_team);    }}//lag成员端口变化处理函数int TeamSync::TeamPortSync::onChange(){    struct team_port *port;    map<string, bool> tmp_lag_members;    /* Check each port  */    team_for_each_port(port, m_team)//遍历该team的每一个端口    {        uint32_t ifindex;        char ifname[MAX_IFNAME + 1] = {0};        bool enabled;        ifindex = team_get_port_ifindex(port);        /* Skip if interface is not found 获取端口,从这里可以看出,端口没有离开team之前不能删除 */        if (!team_ifindex2ifname(m_team, ifindex, ifname, MAX_IFNAME))        {            SWSS_LOG_INFO("Interface ifindex(%u) is not found", ifindex);            continue;        }        /* Skip the member that is removed from the LAG */        /* 端口已经被移除 */        if (team_is_port_removed(port))        {            continue;        }        /* 获取端口是否使能 */        team_get_port_enabled(m_team, ifindex, &enabled);        //获取每一个使能的端口        tmp_lag_members[string(ifname)] = enabled;    }    /* Compare old and new LAG members and set/del accordingly */    //比较两次事件之间的lag成员变化    for (auto it : tmp_lag_members)    {        //新增端口,或者原来的端口状态发生变化        if (m_lagMembers.find(it.first) == m_lagMembers.end() || it.second != m_lagMembers[it.first])        {            //刷新数据库            string key = m_lagName + ":" + it.first;            vector<FieldValueTuple> v;            FieldValueTuple l("status", it.second ? "enabled" : "disabled");            v.push_back(l);            m_lagMemberTable->set(key, v);        }    }    //需要删除的端口。进行删除    for (auto it : m_lagMembers)    {        if (tmp_lag_members.find(it.first) == tmp_lag_members.end())        {            string key = m_lagName + ":" + it.first;            m_lagMemberTable->del(key);        }    }    /* Replace the old LAG members with the new ones */    m_lagMembers = tmp_lag_members;    return 0;}int TeamSync::TeamPortSync::teamdHandler(struct team_handle *team, void *arg,                                         team_change_type_mask_t type_mask){    return ((TeamSync::TeamPortSync *)arg)->onChange();}int TeamSync::TeamPortSync::getFd(){    return team_get_event_fd(m_team);}void TeamSync::TeamPortSync::readData(){    team_handle_events(m_team);}

teamsyncd

int main(int argc, char **argv){    swss::Logger::linkToDbNative("teamsyncd");    DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);//连接app_db    DBConnector stateDb(STATE_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);    Select s;    TeamSync sync(&db, &stateDb, &s);    //加入主播组,监听RTM_NEWLINK和RTM_DELLINK事件,lag up/down信息也是通过RTM_NEWLINK传递    NetDispatcher::getInstance().registerMessageHandler(RTM_NEWLINK, &sync);    NetDispatcher::getInstance().registerMessageHandler(RTM_DELLINK, &sync);    while (1)    {        try        {            NetLink netlink;            netlink.registerGroup(RTNLGRP_LINK);            cout << "Listens to teamd events..." << endl;            netlink.dumpRequest(RTM_GETLINK);            s.addSelectable(&netlink);            while (true)            {                Selectable *temps;                s.select(&temps);            }        }        catch (const std::exception& e)        {            cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;            return 0;        }    }    return 1;}

app_db数据示例

127.0.0.1:6379> SELECT 0127.0.0.1:6379> KEYS *LAG*1) "LAG_MEMBER_TABLE:PortChannel1:Ethernet4"2) "LAG_MEMBER_TABLE:PortChannel1:Ethernet0"3) "LAG_TABLE:PortChannel1"127.0.0.1:6379> HGETALL "LAG_TABLE:PortChannel1"1) "admin_status"2) "up"3) "oper_status"4) "up"5) "mtu"6) "9100"127.0.0.1:6379> HGETALL "LAG_MEMBER_TABLE:PortChannel1:Ethernet0"1) "status"2) "enabled"127.0.0.1:6379> 

LAG APP_DB信息同步到ASIC_DB实现机制分析

lag与lag-member相关部分处理是在portsorch中进行处理。

该部分涉及的文件有:

portsorch.cpp

portsorch.h

LAG

void PortsOrch::doLagTask(Consumer &consumer){    SWSS_LOG_ENTER();    auto it = consumer.m_toSync.begin();    while (it != consumer.m_toSync.end())    {        auto &t = it->second;        string lag_alias = kfvKey(t);        string op = kfvOp(t);        if (op == SET_COMMAND)        {            /* Duplicate entry */            if (m_portList.find(lag_alias) != m_portList.end())            {                it = consumer.m_toSync.erase(it);                continue;            }            if (addLag(lag_alias))//同步到硬件                it = consumer.m_toSync.erase(it);            else                it++;        }        else if (op == DEL_COMMAND)        {            Port lag;            /* Cannot locate LAG */            if (!getPort(lag_alias, lag))            {                it = consumer.m_toSync.erase(it);                continue;            }            if (removeLag(lag))                it = consumer.m_toSync.erase(it);            else                it++;        }        else        {            SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());            it = consumer.m_toSync.erase(it);        }    }}

从上面可以看出,orch没有响应lag的状态变化,会出现使用ifconfig lagname down后,lag仍能够转发报文,不过这种配置不应该出现。

lagmember

void PortsOrch::doLagMemberTask(Consumer &consumer){    SWSS_LOG_ENTER();    auto it = consumer.m_toSync.begin();    while (it != consumer.m_toSync.end())//遍历该消费者的每一个事件    {        auto &t = it->second;        /* Retrieve LAG alias and LAG member alias from key */        string key = kfvKey(t);        size_t found = key.find(':');        /* Return if the format of key is wrong */        if (found == string::npos)        {            SWSS_LOG_ERROR("Failed to parse %s", key.c_str());            return;        }        string lag_alias = key.substr(0, found);//获取lag别名        string port_alias = key.substr(found+1);//获取lag成员接口名        string op = kfvOp(t);        Port lag, port;        if (!getPort(lag_alias, lag))//查看lag是否存在,如果不存在直接跳出        {            SWSS_LOG_INFO("Failed to locate LAG %s", lag_alias.c_str());            it++;            continue;        }        if (!getPort(port_alias, port))        {            SWSS_LOG_ERROR("Failed to locate port %s", port_alias.c_str());            it = consumer.m_toSync.erase(it);            continue;        }        /* Update a LAG member */        if (op == SET_COMMAND)        {            string status;            for (auto i : kfvFieldsValues(t))            {                if (fvField(i) == "status")                    status = fvValue(i);            }            /* Sync an enabled member */            if (status == "enabled")//成员使能            {                /* Duplicate entry 成员已经存在,直接跳出 */                if (lag.m_members.find(port_alias) != lag.m_members.end())                {                    it = consumer.m_toSync.erase(it);                    continue;                }                /* Assert the port doesn't belong to any LAG */                assert(!port.m_lag_id && !port.m_lag_member_id);                //添加成员                if (addLagMember(lag, port))                    it = consumer.m_toSync.erase(it);                else                    it++;            }            /* Sync an disabled member */            else /* status == "disabled" */            {                /* "status" is "disabled" at start when m_lag_id and                 * m_lag_member_id are absent */                if (!port.m_lag_id || !port.m_lag_member_id)                {                    it = consumer.m_toSync.erase(it);                    continue;                }                //功能禁止,直接从硬件中删除                if (removeLagMember(lag, port))                    it = consumer.m_toSync.erase(it);                else                    it++;            }        }        /* Remove a LAG member 删除成员*/        else if (op == DEL_COMMAND)        {            /* Assert the LAG member exists */            assert(lag.m_members.find(port_alias) != lag.m_members.end());            if (!port.m_lag_id || !port.m_lag_member_id)            {                SWSS_LOG_WARN("Member %s not found in LAG %s lid:%lx lmid:%lx,",                        port.m_alias.c_str(), lag.m_alias.c_str(), lag.m_lag_id, port.m_lag_member_id);                it = consumer.m_toSync.erase(it);                continue;            }            if (removeLagMember(lag, port))                it = consumer.m_toSync.erase(it);            else                it++;        }        else        {            SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());            it = consumer.m_toSync.erase(it);        }    }}

asic_db数据示例

127.0.0.1:6379[1]> KEYS *LAG*1) "ASIC_STATE:SAI_OBJECT_TYPE_LAG_MEMBER:oid:0x1b0000000005e3"2) "ASIC_STATE:SAI_OBJECT_TYPE_LAG_MEMBER:oid:0x1b0000000005e4"3) "ASIC_STATE:SAI_OBJECT_TYPE_LAG:oid:0x20000000005d2"127.0.0.1:6379[1]> HGETALL ASIC_STATE:SAI_OBJECT_TYPE_LAG_MEMBER:oid:0x1b0000000005e41) "SAI_LAG_MEMBER_ATTR_LAG_ID"2) "oid:0x20000000005d2"3) "SAI_LAG_MEMBER_ATTR_PORT_ID"4) "oid:0x1000000000003"127.0.0.1:6379[1]>