sonic orch调度系统之----select

常见的服务器模型有多进程模型,多线程,IO多路复用,协程等模型。sonic的核心守护进程orchagent采用的是IO多路复用模型,早期的sonic采用的是select实现多路复用,后面的版本采用的是epoll。使用select(跟多路复用的select名字一样)类对底层进行了封装,屏蔽了差异。

class Selectable

事件基类,描述了epoll事件,可以是读,写,异常等事件。该结构对通用epoll事件进行了封装,真实事件通过该类派生出来,比如redis数据库事件:class RedisSelect : public Selectable;netlink事件:class NetLink : public Selectable;通知:class NotificationConsumer : public Selectable,orch执行单元:class Executor : public Selectable,定时器:class SelectableTimer : public Selectable等。

class Selectable{public:    Selectable(int pri = 0) : m_priority(pri),                              m_last_used_time(std::chrono::steady_clock::now()) {                              lastusedsequence = g_lastusedsequence++;}    virtual ~Selectable() = default;    /* return file handler for the Selectable */    virtual int getFd() = 0;    /* Read all data from the fd assicaited with Selectable */    virtual void readData() = 0;    /* true if Selectable has data in its cache */    // 判断是否还有数据,如果有放入就绪事件set    virtual bool hasCachedData()    {        return false;    }    /* true if Selectable was initialized with data */    // 判断是否需要读取初始数据    virtual bool initializedWithData()    {        return false;    }    /* run this function after every read */    // 更新事件数    virtual void updateAfterRead()    {    }    int getPri() const    {        return m_priority;    }private:    friend class Select;//友元类为Select    // only Select class can access and update m_last_used_time    std::chrono::time_point<std::chrono::steady_clock> getLastUsedTime() const    {        return m_last_used_time;    }    // 最后使用序列号    unsigned long getLastUsedsequence() const    {        return lastusedsequence;    }    // 跟新最后使用序列号    void updateLastUsedTime()    {        m_last_used_time = std::chrono::steady_clock::now();        lastusedsequence = g_lastusedsequence++;    }    // 优先级,实现基于优先级调度    int m_priority; // defines priority of Selectable inside Select                    // higher value is higher priority    std::chrono::time_point<std::chrono::steady_clock> m_last_used_time;    unsigned long lastusedsequence;//上次使用序列号    static unsigned long g_lastusedsequence;//全局基准序列号,用于对同优先级业务进行公平调度};

class Select

class Select{public:    Select();    ~Select();    /* Add object for select 给epoll添加一个事件 */    void addSelectable(Selectable *selectable);    /* Remove object from select 删除一个epoll事件 */    void removeSelectable(Selectable *selectable);    /* Add multiple objects for select 添加多个epoll事件 */    void addSelectables(std::vector<Selectable *> selectables);    enum {//返回的事件类型        OBJECT = 0,        ERROR = 1,        TIMEOUT = 2,    };    //执行epoll     int select(Selectable **c, unsigned int timeout = std::numeric_limits<unsigned int>::max());    int select(std::vector<Selectable *> &vc, unsigned int timeout = std::numeric_limits<unsigned int>::max());private:    //epoll事件比较函数,通过该函数实现事件的优先级    struct cmp    {        bool operator()(const Selectable* a, const Selectable* b) const        {            /* Choose Selectable with highest priority first */            if (a->getPri() > b->getPri())                return true;            else if (a->getPri() < b->getPri())                return false;            /* if the priorities are equal */            /* use Selectable which was selected later */            if (a->getLastUsedsequence() < b->getLastUsedsequence())                return true;            else if (a->getLastUsedsequence() > b->getLastUsedsequence())                return false;            /* when a == b */            return false;        }    };    //epoll轮询函数    int poll_descriptors(Selectable **c, unsigned int timeout);    int poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout);    int m_epoll_fd;//epoll句柄    std::unordered_map<int, Selectable *> m_objects;//监听的事件句柄与其对应的selectable之间的关系    std::set<Selectable *, Select::cmp> m_ready;//已经就绪的事件集合,提供了比较函数,从而实现优先级调度};

Select::Select()

Select::Select(){    m_epoll_fd = ::epoll_create1(0);//创建epoll句柄    if (m_epoll_fd == -1)    {        std::string error = std::string("Select::constructor:epoll_create1: error=("                          + std::to_string(errno) + "}:"                          + strerror(errno));        throw std::runtime_error(error);    }}

Select::~Select()

Select::~Select(){    (void)::close(m_epoll_fd);}

void Select::addSelectable(Selectable *selectable)

void Select::addSelectable(Selectable *selectable){    const int fd = selectable->getFd();    if(m_objects.find(fd) != m_objects.end())//已经添加了该事件,退出    {        SWSS_LOG_WARN("Selectable is already added to the list, ignoring.");        return;    }    m_objects[fd] = selectable;    if (selectable->initializedWithData())//是否已经有数据可读,读出已有的数据    {        m_ready.insert(selectable);    }    //添加可读事件    struct epoll_event ev = {        .events = EPOLLIN,        .data = { .fd = fd, },    };    int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev);    if (res == -1)    {        std::string error = std::string("Select::add_fd:epoll_ctl: error=("                          + std::to_string(errno) + "}:"                          + strerror(errno));        throw std::runtime_error(error);    }}

void Select::removeSelectable(Selectable *selectable)

void Select::removeSelectable(Selectable *selectable){    const int fd = selectable->getFd();    m_objects.erase(fd);    m_ready.erase(selectable);    //从epoll中删除事件    int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL);    if (res == -1)    {        std::string error = std::string("Select::del_fd:epoll_ctl: error=("                          + std::to_string(errno) + "}:"                          + strerror(errno));        throw std::runtime_error(error);    }}

void Select::addSelectables(vector<Selectable *> selectables)

void Select::addSelectables(vector<Selectable *> selectables){    for(auto it : selectables)//添加多个事件    {        addSelectable(it);    }}

int Select::poll_descriptors(......)

提取一个就绪事件

int Select::poll_descriptors(Selectable **c, unsigned int timeout){    int sz_selectables = static_cast<int>(m_objects.size());    std::vector<struct epoll_event> events(sz_selectables);    int ret;    //阻塞等待事件发生,发生错误或者被中断打断则继续监听,发生事件则执行事件    do    {        ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);    }    while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal    if (ret < 0)        return Select::ERROR;    //遍历每一个发生的事件    for (int i = 0; i < ret; ++i)    {        int fd = events[i].data.fd;        Selectable* sel = m_objects[fd];//获取事件描述符        sel->readData();//读取数据        m_ready.insert(sel);//插入就绪集合    }    //存在就绪事件    if (!m_ready.empty())    {        auto sel = *m_ready.begin();        *c = sel;        m_ready.erase(sel);        // we must update clock only when the selector out of the m_ready        // otherwise we break invariant of the m_ready        // 更新该事件的使用时间,使用事件会作为事件优先级进行使用,越频繁的优先级越低,从而避免同优先级的事件        // 饿死        sel->updateLastUsedTime();        // 有数据,依然放入已经就绪集合        if (sel->hasCachedData())        {            // reinsert Selectable back to the m_ready set, when there're more messages in the cache            m_ready.insert(sel);        }        // 对数据进行刷新,如果该句柄只发生了一次事件,那么这里会进行减1,下次m_ready中将不会存在该sel。        // 仔细分析了sonic的selectable的实现,这里是有bug的,会造成大量的空转。        sel->updateAfterRead();        return Select::OBJECT;    }    return Select::TIMEOUT;}

int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout)

提取多个就绪事件,该函数是在上面的函数的基础上的改进。只提取一个事件将会造成"饿死和胀死"的问题。由于m_ready是有序队列,对于高优先的事件总是会被优先提取,如果高优先级的事件依赖于低优先级事件的话,会造成高优先级的业务一直被调度,但是缺少依赖条件而不能执行业务,低优先级业务总是得不到调度,形成死锁问题。同时提取所有就绪事件可以解决高低优先级死锁问题。

int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout){    int sz_selectables = static_cast<int>(m_objects.size());    std::vector<struct epoll_event> events(sz_selectables);    int ret;    do    {        ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);    }    while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal    if (ret < 0)        return Select::ERROR;    for (int i = 0; i < ret; ++i)    {        int fd = events[i].data.fd;        Selectable* sel = m_objects[fd];        sel->readData();        m_ready.insert(sel);    }        auto iter = m_ready.begin();    while(iter !=m_ready.end())    {        auto sel = *iter;        vc.push_back(sel);        iter = m_ready.erase(iter);        sel->updateLastUsedTime();    }    for(auto se:vc)    {        if (se->hasCachedData())        {            m_ready.insert(se);        }        se->updateAfterRead();    }    if(!vc.empty())    {        return Select::OBJECT;    }        return Select::TIMEOUT;}

int Select::select(Selectable **c, unsigned int timeout)

int Select::select(Selectable **c, unsigned int timeout){    SWSS_LOG_ENTER();    int ret;    *c = NULL;    if (timeout == numeric_limits<unsigned int>::max())        timeout = -1;    /* check if we have some data */    ret = poll_descriptors(c, 0);    /* return if we have data, we have an error or desired timeout was 0 */    if (ret != Select::TIMEOUT || timeout == 0)        return ret;    /* wait for data */    ret = poll_descriptors(c, timeout);    return ret;}

int Select::select(std::vector<Selectable *> &vc, unsigned int timeout)

int Select::select(std::vector<Selectable *> &vc, unsigned int timeout){    SWSS_LOG_ENTER();    int ret;    if (timeout == numeric_limits<unsigned int>::max())        timeout = -1;    /* check if we have some data */    ret = poll_descriptors(vc, 0);    /* return if we have data, we have an error or desired timeout was 0 */    if (ret != Select::TIMEOUT || timeout == 0)        return ret;    /* wait for data */    ret = poll_descriptors(vc, timeout);    return ret;}