sonic orchagent 线程的调度最小单位是 Consumer。Consumer 是在 epoll 事件 Selectable 的基础上的进一步封装,每一次发生 epoll 事件会触发 orchagent 进行一次调度。orch 是资源的集合,一个 orch 可以包含多个 Consumer,比如 acl orch 会监听多个 redistable。
class Executor
// Design assumption
// 1. one Orch can have one or more Executor
// 2. one Executor must belong to one and only one Orch
// 3. Executor will hold an pointer to new-ed selectable, and delete it during dtor
// 设计假设:// 1. 一个 orch 可以拥有一个或者多个 Executor
// 2. 一个 Executor 必须属于一个 orch 而且仅仅属于一个 orch
// 3. Executor 有一个指针指向一个 new 出来的 Selectable 结构,必须在析构函数中将其删除,否则会泄漏
class Executor : public Selectable
{
public:
Executor(Selectable *selectable, Orch *orch)
: m_selectable(selectable)
, m_orch(orch)
{ }
virtual ~Executor() { delete m_selectable;}
// Decorating Selectable
int getFd() override { return m_selectable->getFd(); }
void readData() override { m_selectable->readData(); }
bool hasCachedData() override { return m_selectable->hasCachedData(); }
bool initializedWithData() override { return m_selectable->initializedWithData(); }
void updateAfterRead() override { m_selectable->updateAfterRead(); }
Orch * getorch() { return m_orch;}
// Disable copying
Executor(const Executor&) = delete;
Executor& operator=(const Executor&) = delete;
// Execute on event happening
// execute 执行事件,drain 是一个辅助函数
virtual void execute() {}
virtual void drain() {}
protected:
Selectable *m_selectable;// 指向 new 出来的 Selectable
Orch *m_orch;// 指向一个 orch
// Get the underlying selectable 获取指向的 Selectable
Selectable *getSelectable() const { return m_selectable;}
};
class Executor 只是一个中间的派生类,orch 直接使用的是 class Consumer 和 class ExecutableTimer。
class Consumer
消费者类一般用于处理 app_db 的订阅事件,对于 asic_db 一般是处理 syncd 的应答事件。
typedef std::pair<std::string, std::string> FieldValueTuple;
#define fvField std::get<0>
#define fvValue std::get<1>
typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyOpFieldsValuesTuple;
#define kfvKey std::get<0>
#define kfvOp std::get<1>
#define kfvFieldsValues std::get<2>
typedef map<string, KeyOpFieldsValuesTuple> SyncMap;
class Consumer : public Executor {
public:
Consumer(TableConsumable *select, Orch *orch)
: Executor(select, orch)
{ }
TableConsumable *getConsumerTable() const
{return static_cast<TableConsumable *>(getSelectable());
}
string getTableName() const
{return getConsumerTable()->getTableName();}
// 事物执行
void execute();
void drain();
/* Store the latest 'golden' status */
// TODO: hide?
SyncMap m_toSync;
};
void Consumer::execute()
epoll 事件触发后,需要调用该函数从数据库中读取出指定 key 的内容,将其加工后存放在 m_toSync 中,供后续处理。
void Consumer::execute()
{SWSS_LOG_ENTER();
std::deque<KeyOpFieldsValuesTuple> entries;
// 调用 pops 函数,从 redis 数据库中读取数据,返回 KeyOpFieldsValuesTuple 结构
getConsumerTable()->pops(entries);
/* Nothing popped */
if (entries.empty())
{return;}
// 遍历每一个事件
for (auto& entry: entries)
{string key = kfvKey(entry);
string op = kfvOp(entry);
/* Record incoming tasks 记录事件 */
if (gSwssRecord)
{Orch::recordTuple(*this, entry);
}
/* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
// 在这里进行一次合并,对于删除事件,直接覆盖
if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND)
{m_toSync[key] = entry;
}
/* If an old task is still there, we combine the old task with new task */
/* */
else
{KeyOpFieldsValuesTuple existing_data = m_toSync[key];
auto new_values = kfvFieldsValues(entry);
auto existing_values = kfvFieldsValues(existing_data);
// 遍历每一个新的值
for (auto it : new_values)
{string field = fvField(it);
string value = fvValue(it);
auto iu = existing_values.begin();
while (iu != existing_values.end())// 遍历每一个旧的值
{string ofield = fvField(*iu);
if (field == ofield)// 相同的域,将老的值覆盖,这里应该跳出 while,代码效率较差
iu = existing_values.erase(iu);
else
iu++;
}
/* 将新的值添加进去 */
existing_values.push_back(FieldValueTuple(field, value));
}
m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
}
}
// 执行所有整理好的任务。drain();}
假设有一个 task 的键值对如下:
key=test;op=set;value={
A:a,
B:b,
C:c,
}
第一次触发任务是在 APP_DB 中写入了:
key=test;op=set;value={
A:a,
B:b
}
加入 orchagent 只是将该任务读取到了 m_toSync 中,由于某种原因没有执行完该任务,依然驻留在 m_toSync 中。第二次写入了:
key=test;op=set;value={
B:b1,
C:c
}
那么经过 execute 函数后 m_toSync 中将会是:
key=test;op=set;value={
A:a,
B:b1,
C:c
}
void Consumer::drain()
执行 m_toSync 中的任务。
void Consumer::drain()
{if (!m_toSync.empty())
m_orch->doTask(*this);
}
class Orch
class Orch
{
public:
// 每个 orch 都会连接到数据库,以及其需要订阅的表名,和订阅该表产生的事件的优先级
// 以默认优先级订阅一个 table
Orch(DBConnector *db, const string tableName, int pri = default_orch_pri);
// 以默认优先级订阅多个 table
Orch(DBConnector *db, const vector<string> &tableNames);
// 订阅多个 table,指明每个 table 的优先级
Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNameWithPri);
// 连接多个数据库
Orch(const vector<TableConnector>& tables);
virtual ~Orch();
// 获取该 orch 的所有 epoll 事件
vector<Selectable*> getSelectables();
/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
// 执行该 orch 中所有的 consumers 中的 m_sync 中的任务
void doTask();
/* Run doTask against a specific executor */
// 任务的来源可以是 consumer,NotificationConsumer,SelectableTimer
virtual void doTask(Consumer &consumer) = 0;
virtual void doTask(NotificationConsumer &consumer) { }
virtual void doTask(SelectableTimer &timer) { }
/* TODO: refactor recording */
static void recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
protected:
// 消费者 map,一个 orch 可以订阅多个 table,key 为 tableName,value 为 Executor
ConsumerMap m_consumerMap;
// 与调试相关
static void logfileReopen();
string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
ref_resolve_status resolveFieldRefValue(type_map&, const string&, KeyOpFieldsValuesTuple&, sai_object_id_t&);
bool parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uint32_t &range_high);
bool parseReference(type_map &type_maps, string &ref, string &table_name, string &object_name);
ref_resolve_status resolveFieldRefArray(type_map&, const string&, KeyOpFieldsValuesTuple&, vector<sai_object_id_t>&);
/* Note: consumer will be owned by this class */
// 内部函数添加一个 Executor,给 addConsumer 使用
void addExecutor(string executorName, Executor* executor);
Executor *getExecutor(string executorName);
private:
// 添加一个消费者
void addConsumer(DBConnector *db, string tableName, int pri = default_orch_pri);
};
void Orch::addConsumer(……)
void Orch::addExecutor(string executorName, Executor* executor)
{
m_consumerMap.emplace(std::piecewise_construct,
std::forward_as_tuple(executorName),
std::forward_as_tuple(executor));
}
// 添加一个消费者
void Orch::addConsumer(DBConnector *db, string tableName, int pri)
{if (db->getDbId() == CONFIG_DB || db->getDbId() == STATE_DB)
{addExecutor(tableName, new Consumer(new SubscriberStateTable(db, tableName, TableConsumable::DEFAULT_POP_BATCH_SIZE, pri), this));
}
else
{addExecutor(tableName, new Consumer(new ConsumerStateTable(db, tableName, gBatchSize, pri), this));
}
}
void Orch::doTask(……)
执行本 orch 中的每一个消费者 m_toSync 中的 task,不管该 task 是否本次从 redis 中读取还是以前未处理完毕的。
void Orch::doTask()
{for(auto &it : m_consumerMap)
{it.second->drain();
}
}
class Orch2
orch2 是在 orch 的基础上的一个封装,代码的可读性增强。
class Orch2 : public Orch
{
public:
Orch2(DBConnector *db, const std::string& tableName, Request& request, int pri=default_orch_pri)
: Orch(db, tableName, pri), request_(request)
{ }
protected:
virtual void doTask(Consumer& consumer);
virtual bool addOperation(const Request& request)=0;
virtual bool delOperation(const Request& request)=0;
private:
Request& request_;
};
void Orch2::doTask
void Orch2::doTask(Consumer &consumer)
{SWSS_LOG_ENTER();
auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
{
bool erase_from_queue = true;
try
{request_.parse(it->second);
auto op = request_.getOperation();
if (op == SET_COMMAND)
{erase_from_queue = addOperation(request_);
}
else if (op == DEL_COMMAND)
{erase_from_queue = delOperation(request_);
}
else
{SWSS_LOG_ERROR("Wrong operation. Check RequestParser: %s", op.c_str());
}
}
catch (const std::invalid_argument& e)
{SWSS_LOG_ERROR("Parse error: %s", e.what());
}
catch (const std::logic_error& e)
{SWSS_LOG_ERROR("Logic error: %s", e.what());
}
catch (const std::exception& e)
{SWSS_LOG_ERROR("Exception was catched in the request parser: %s", e.what());
}
catch (...)
{SWSS_LOG_ERROR("Unknown exception was catched in the request parser");
}
request_.clear();
// 执行成功,那么从 m_tosync 中删除,否则执行下一个 task
if (erase_from_queue)
{it = consumer.m_toSync.erase(it);
}
else
{++it;}
}
}