共计 7503 个字符,预计需要花费 19 分钟才能阅读完成。
sonic orch 调度系统之 —-select
常见的服务器模型有多进程模型,多线程,IO 多路复用,协程等模型。sonic 的核心守护进程 orchagent 采用的是 IO 多路复用模型,早期的 sonic 采用的是 select 实现多路复用,后面的版本采用的是 epoll。使用 select(跟多路复用的 select 名字一样) 类对底层进行了封装,屏蔽了差异。
class Selectable
事件基类,描述了 epoll 事件,可以是读,写,异常等事件。该结构对通用 epoll 事件进行了封装,真实事件通过该类派生出来,比如 redis 数据库事件:class RedisSelect : public Selectable;netlink 事件:class NetLink : public Selectable;通知:class NotificationConsumer : public Selectable,orch 执行单元:class Executor : public Selectable,定时器:class SelectableTimer : public Selectable 等。
class Selectable
{
public:
Selectable(int pri = 0) : m_priority(pri),
m_last_used_time(std::chrono::steady_clock::now()) {lastusedsequence = g_lastusedsequence++;}
virtual ~Selectable() = default;
/* return file handler for the Selectable */
virtual int getFd() = 0;
/* Read all data from the fd assicaited with Selectable */
virtual void readData() = 0;
/* true if Selectable has data in its cache */
// 判断是否还有数据,如果有放入就绪事件 set
virtual bool hasCachedData()
{return false;}
/* true if Selectable was initialized with data */
// 判断是否需要读取初始数据
virtual bool initializedWithData()
{return false;}
/* run this function after every read */
// 更新事件数
virtual void updateAfterRead()
{ }
int getPri() const
{return m_priority;}
private:
friend class Select;// 友元类为 Select
// only Select class can access and update m_last_used_time
std::chrono::time_point<std::chrono::steady_clock> getLastUsedTime() const
{return m_last_used_time;}
// 最后使用序列号
unsigned long getLastUsedsequence() const
{return lastusedsequence;}
// 跟新最后使用序列号
void updateLastUsedTime()
{m_last_used_time = std::chrono::steady_clock::now();
lastusedsequence = g_lastusedsequence++;
}
// 优先级,实现基于优先级调度
int m_priority; // defines priority of Selectable inside Select
// higher value is higher priority
std::chrono::time_point<std::chrono::steady_clock> m_last_used_time;
unsigned long lastusedsequence;// 上次使用序列号
static unsigned long g_lastusedsequence;// 全局基准序列号,用于对同优先级业务进行公平调度
};
class Select
class Select
{
public:
Select();
~Select();
/* Add object for select 给 epoll 添加一个事件 */
void addSelectable(Selectable *selectable);
/* Remove object from select 删除一个 epoll 事件 */
void removeSelectable(Selectable *selectable);
/* Add multiple objects for select 添加多个 epoll 事件 */
void addSelectables(std::vector<Selectable *> selectables);
enum {// 返回的事件类型
OBJECT = 0,
ERROR = 1,
TIMEOUT = 2,
};
// 执行 epoll
int select(Selectable **c, unsigned int timeout = std::numeric_limits<unsigned int>::max());
int select(std::vector<Selectable *> &vc, unsigned int timeout = std::numeric_limits<unsigned int>::max());
private:
//epoll 事件比较函数,通过该函数实现事件的优先级
struct cmp
{bool operator()(const Selectable* a, const Selectable* b) const
{
/* Choose Selectable with highest priority first */
if (a->getPri() > b->getPri())
return true;
else if (a->getPri() < b->getPri())
return false;
/* if the priorities are equal */
/* use Selectable which was selected later */
if (a->getLastUsedsequence() < b->getLastUsedsequence())
return true;
else if (a->getLastUsedsequence() > b->getLastUsedsequence())
return false;
/* when a == b */
return false;
}
};
//epoll 轮询函数
int poll_descriptors(Selectable **c, unsigned int timeout);
int poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout);
int m_epoll_fd;//epoll 句柄
std::unordered_map<int, Selectable *> m_objects;// 监听的事件句柄与其对应的 selectable 之间的关系
std::set<Selectable *, Select::cmp> m_ready;// 已经就绪的事件集合,提供了比较函数,从而实现优先级调度
};
Select::Select()
Select::Select()
{m_epoll_fd = ::epoll_create1(0);// 创建 epoll 句柄
if (m_epoll_fd == -1)
{
std::string error = std::string("Select::constructor:epoll_create1: error=("
+ std::to_string(errno) + "}:"
+ strerror(errno));
throw std::runtime_error(error);
}
}
Select::~Select()
Select::~Select()
{(void)::close(m_epoll_fd);
}
void Select::addSelectable(Selectable *selectable)
void Select::addSelectable(Selectable *selectable)
{const int fd = selectable->getFd();
if(m_objects.find(fd) != m_objects.end())// 已经添加了该事件,退出
{SWSS_LOG_WARN("Selectable is already added to the list, ignoring.");
return;
}
m_objects[fd] = selectable;
if (selectable->initializedWithData())// 是否已经有数据可读,读出已有的数据
{m_ready.insert(selectable);
}
// 添加可读事件
struct epoll_event ev = {
.events = EPOLLIN,
.data = {.fd = fd,},
};
int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
if (res == -1)
{
std::string error = std::string("Select::add_fd:epoll_ctl: error=("
+ std::to_string(errno) + "}:"
+ strerror(errno));
throw std::runtime_error(error);
}
}
void Select::removeSelectable(Selectable *selectable)
void Select::removeSelectable(Selectable *selectable)
{const int fd = selectable->getFd();
m_objects.erase(fd);
m_ready.erase(selectable);
// 从 epoll 中删除事件
int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL);
if (res == -1)
{
std::string error = std::string("Select::del_fd:epoll_ctl: error=("
+ std::to_string(errno) + "}:"
+ strerror(errno));
throw std::runtime_error(error);
}
}
void Select::addSelectables(vector<Selectable *> selectables)
void Select::addSelectables(vector<Selectable *> selectables)
{for(auto it : selectables)// 添加多个事件
{addSelectable(it);
}
}
int Select::poll_descriptors(……)
提取一个就绪事件
int Select::poll_descriptors(Selectable **c, unsigned int timeout)
{int sz_selectables = static_cast<int>(m_objects.size());
std::vector<struct epoll_event> events(sz_selectables);
int ret;
// 阻塞等待事件发生,发生错误或者被中断打断则继续监听,发生事件则执行事件
do
{ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
}
while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal
if (ret < 0)
return Select::ERROR;
// 遍历每一个发生的事件
for (int i = 0; i < ret; ++i)
{int fd = events[i].data.fd;
Selectable* sel = m_objects[fd];// 获取事件描述符
sel->readData();// 读取数据
m_ready.insert(sel);// 插入就绪集合
}
// 存在就绪事件
if (!m_ready.empty())
{auto sel = *m_ready.begin();
*c = sel;
m_ready.erase(sel);
// we must update clock only when the selector out of the m_ready
// otherwise we break invariant of the m_ready
// 更新该事件的使用时间,使用事件会作为事件优先级进行使用,越频繁的优先级越低,从而避免同优先级的事件
// 饿死
sel->updateLastUsedTime();
// 有数据,依然放入已经就绪集合
if (sel->hasCachedData())
{
// reinsert Selectable back to the m_ready set, when there're more messages in the cache
m_ready.insert(sel);
}
// 对数据进行刷新,如果该句柄只发生了一次事件,那么这里会进行减 1,下次 m_ready 中将不会存在该 sel。// 仔细分析了 sonic 的 selectable 的实现,这里是有 bug 的,会造成大量的空转。sel->updateAfterRead();
return Select::OBJECT;
}
return Select::TIMEOUT;
}
int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout)
提取多个就绪事件,该函数是在上面的函数的基础上的改进。只提取一个事件将会造成 ” 饿死和胀死 ” 的问题。由于 m_ready 是有序队列,对于高优先的事件总是会被优先提取,如果高优先级的事件依赖于低优先级事件的话,会造成高优先级的业务一直被调度,但是缺少依赖条件而不能执行业务,低优先级业务总是得不到调度,形成死锁问题。同时提取所有就绪事件可以解决高低优先级死锁问题。
int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout)
{int sz_selectables = static_cast<int>(m_objects.size());
std::vector<struct epoll_event> events(sz_selectables);
int ret;
do
{ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
}
while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal
if (ret < 0)
return Select::ERROR;
for (int i = 0; i < ret; ++i)
{int fd = events[i].data.fd;
Selectable* sel = m_objects[fd];
sel->readData();
m_ready.insert(sel);
}
auto iter = m_ready.begin();
while(iter !=m_ready.end())
{
auto sel = *iter;
vc.push_back(sel);
iter = m_ready.erase(iter);
sel->updateLastUsedTime();}
for(auto se:vc)
{if (se->hasCachedData())
{m_ready.insert(se);
}
se->updateAfterRead();}
if(!vc.empty())
{return Select::OBJECT;}
return Select::TIMEOUT;
}
int Select::select(Selectable **c, unsigned int timeout)
int Select::select(Selectable **c, unsigned int timeout)
{SWSS_LOG_ENTER();
int ret;
*c = NULL;
if (timeout == numeric_limits<unsigned int>::max())
timeout = -1;
/* check if we have some data */
ret = poll_descriptors(c, 0);
/* return if we have data, we have an error or desired timeout was 0 */
if (ret != Select::TIMEOUT || timeout == 0)
return ret;
/* wait for data */
ret = poll_descriptors(c, timeout);
return ret;
}
int Select::select(std::vector<Selectable *> &vc, unsigned int timeout)
int Select::select(std::vector<Selectable *> &vc, unsigned int timeout)
{SWSS_LOG_ENTER();
int ret;
if (timeout == numeric_limits<unsigned int>::max())
timeout = -1;
/* check if we have some data */
ret = poll_descriptors(vc, 0);
/* return if we have data, we have an error or desired timeout was 0 */
if (ret != Select::TIMEOUT || timeout == 0)
return ret;
/* wait for data */
ret = poll_descriptors(vc, timeout);
return ret;
}
正文完