sonic-orch调度系统之orchagent

69次阅读

共计 4619 个字符,预计需要花费 12 分钟才能阅读完成。

​ sonic 核心守护线程 orchagent 以 orch 为单位进行资源管理,一个 orch 包含了一组相似的资源;orchagent 调度系统以 Executor 为调度单位,调度实体有 Consumer,ExecutableTimer 等,本文分析一下 sonic 调度细节。

class OrchDaemon

orchagent 以 OrchDaemon 作为核心类进行描述。

class OrchDaemon
{
public:
    OrchDaemon(DBConnector *, DBConnector *, DBConnector *);
    ~OrchDaemon();

    bool init();// 初始化进程
    void start();// 启动调度系统
private:
    // 连接了三个数据库
    DBConnector *m_applDb;
    DBConnector *m_configDb;
    DBConnector *m_stateDb;
    // 包含所有的 orch
    std::vector<Orch *> m_orchList;
    // 创建的 select 多路异步 IO 控制块
    Select *m_select;
    // 将 asic_db 的 pipe 进行 flush,不在等待。void flush();};

bool OrchDaemon::init()

初始化 orchagent 执行环境。

bool OrchDaemon::init()
{SWSS_LOG_ENTER();

    ......
        
    // 连接数据库
    TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_NAME);
    TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
    TableConnector stateDbLagTable(m_stateDb, STATE_LAG_TABLE_NAME);

    vector<TableConnector> acl_table_connectors = {
        confDbAclTable,
        confDbAclRuleTable,
        stateDbLagTable
    };

    ......
    
    // 收集对应的 orch
    m_orchList = {switch_orch, gCrmOrch, gBufferOrch, gPortsOrch, intfs_orch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, mirror_orch, gAclOrch, gFdbOrch, vrf_orch};
    // 创建多路事件控制块
    m_select = new Select();

    ......
    // 除了上面的 orch 外,其它代码还添加了一些 orch,这里不再列出

    return true;
}

void OrchDaemon::flush()

将 asic_db 的生产者的 pipeline 清空

/* Flush redis through sairedis interface */
void OrchDaemon::flush()
{SWSS_LOG_ENTER();

    sai_attribute_t attr;
    attr.id = SAI_REDIS_SWITCH_ATTR_FLUSH;
    sai_status_t status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
    if (status != SAI_STATUS_SUCCESS)
    {SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
        exit(EXIT_FAILURE);
    }
}

void OrchDaemon::start()

void OrchDaemon::start()
{SWSS_LOG_ENTER();
    // 遍历每一个 orch,将每一个 orch 中的所有关心的事件加入 epoll 中,基本一个 Executor 为一个事件
    for (Orch *o : m_orchList)
    {m_select->addSelectables(o->getSelectables());
    }
    // 进入 dead loop
    while (true)
    {
        Selectable *s;
        int ret;
        // 进行 epoll 阻塞监听
        ret = m_select->select(&s, SELECT_TIMEOUT);
        // 错误事件,继续监听
        if (ret == Select::ERROR)
        {SWSS_LOG_NOTICE("Error: %s!\n", strerror(errno));
            continue;
        }
        // 超时事件
        if (ret == Select::TIMEOUT)
        {
            /* Let sairedis to flush all SAI function call to ASIC DB.
             * Normally the redis pipeline will flush when enough request
             * accumulated. Still it is possible that small amount of
             * requests live in it. When the daemon has nothing to do, it
             * is a good chance to flush the pipeline  
             * 确保 redis-pipeline 中的少量请求在 10 秒后能够得到处理,不在积累请求
             */
            flush();
            continue;
        }
        // 获取触发 epoll 事件的 Executor
        auto *c = (Executor *)s;
        // 对于 Consumer 来说,执行数据库操作,将 redis 中的通知转换到 m_tosync 中,并且执行以下 m_tosync 中的 task
        c->execute();

        /* After each iteration, periodically check all m_toSync map to
         * execute all the remaining tasks that need to be retried. */

        /* TODO: Abstract Orch class to have a specific todo list */
        // 执行其它的 orch 中遗留的任务
        for (Orch *o : m_orchList)
            o->doTask();}
}

实例分析

下面我们以 orchagent 处理的最多的事件:ConsumerStateTable 来分析一下 orchagent 调度系统。

ConsumerStateTable 即 orchagent 订阅的 app_db 事件。格式如下所示:

"SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8"        #在集合 INTF_TABLE_KEY_SET 中增加一个 key
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" 
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4"
"PUBLISH" "INTF_TABLE_CHANNEL" "G"     
  • 当生产者发送命令 “PUBLISH” “INTF_TABLE_CHANNEL” “G”,orchagent 从 epoll 中被唤醒,然后从对应的 redis 客户端句柄中调用 RedisSelect::readData() 进行数据读取该命令的应答,应答如下所示:
1) "message"
2) "INTF_TABLE_CHANNEL"
3) "G"
  • 每收到一个应答都会对 RedisSelect::m_queueLength 进行加 1,该应答只有一个信息 ”G”, 只起到通知作用。所以如果生产者同时写入了大量的 app_db 事件,某一个 epoll 唤醒,RedisSelect::readData()可以读出大量的应答,从而导致 m_queueLength 大于 1。
  • 读完数据后将该事件 Selectable 添加到 Select:m_ready 中。同时通过调用 RedisSelect::updateAfterRead()对 RedisSelect::m_queueLength 进行减 1,这里只减去了 1,而没有根据实际处理的情况减去对应的值,这里存在增加多个值减去一个值的情况。只要 m_queueLength 不为 1,就不会将 Selectable 从 Select:m_ready 删除。
  • Select::select 函数会返回 Select:m_ready 中所有的 Selectable。
  • 遍历每一个 selectable,在函数 Consumer::execute()中调用 TableEntryPoppable::pops 使用脚本 consumer_state_table_pops.lua 处理 INTF_TABLE_KEY_SET 中的 key,每次最多处理 128 个 key。
local ret = {}
local keys = redis.call('SPOP', KEYS[1], ARGV[1])-- 一次处理 128 个 key
local n = table.getn(keys)
for i = 1, n do
   local key = keys[i]
   local values = redis.call('HGETALL', KEYS[2] .. key)
   table.insert(ret, {key, values})
end
return ret

脚本返回的内容将会是:

INTF_TABLE:PortChannel1:1.1.1.1/8:{
    "scope":"global",
    "family": "IPv4"
}

pops 然后对上面的内容进行加工成如下格式:

std::tuple<std::string, std::string, std::vector<FieldValueTuple> >

即最后返回:

"SET", "INTF_TABLE:PortChannel1:1.1.1.1/8", <"scope":"global","family": "IPv4">
  • Consumer::execute()会对前面返回的值添加到 Consumer::m_toSync, 这里会进行合并。
  • 最后 OrchDaemon::start()函数会执行每一个 orch 的 Consumer::m_toSync 中的 task。

sonic 调度系统的缺陷

sonic 调度系统过于简单,无法处理大规模,逻辑复杂的业务,效率非常低下。

  1. 当超规格下发配置的时候,会导致 m_toSync 中因为资源不足而驻留大量的 task。OrchDaemon::start()每一次事件触发都会遍历所有 orch 的 m_toSync 中的 task,造成无效 task 频繁执行,引发 orchagent 发生震荡。必须给 Consumer 增加一些标记,表明当前 Consumer 处于资源不足状态,暂不执行 m_toSync 中的 task
  2. 当大规模下发配置的时候,如果本配置的依赖还没有下发,会导致 m_toSync 中因为依赖不满足而驻留大量的 task。引发 orchagent 震荡问题。
  3. 当大规模下删除配置的时候,如果本配置引用还没有删除,会导致 m_toSync 中因为被引用不能删除而驻留大量的 task。引发 orchagent 震荡问题。
  4. orchagent 在处理 task 的时候没有考虑事件处理顺序,应该首先执行 DEL 操作,然后再执行 SET 操作。对于 DEL 操作,应该先删除低优先级的 Consumer,对于 SET 操作来说,应该先处理高优先级的 Consumer。两者顺序相反。

正文完
 0