共计 7028 个字符,预计需要花费 18 分钟才能阅读完成。
本章节主要分析 sonic 使用 redis 的键空间消息机制实现的消息传递框架,该机制区别于发布 - 订阅机制在于发布者不需要进行 pubulish 通知,只要往数据库中写入指定的键,redis 就会通知监听了该键空间的客户端。该机制目前只用于监听 config_db,用于监听 config 的变化。然后将其同步到 app_db。使用该机制的案例有:VlanMgr,IntfMgr,portsyncd 等,可以通过 orch 包装使用,比如 VlanMgr;也可以直接定义 SubscriberStateTable 表,比如 portCfg。
redis 键空间事件机制样例
# 在数据库 0 中订阅以 tom 开头的键
127.0.0.1:6379> PSUBSCRIBE __keyspace@0__:tom*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyspace@0__:tom*"
3) (integer) 1
#在数据库 0 中添加 hash 表 tom
127.0.0.1:6379> HMSET tom|1 name tom age 28
OK
#订阅者得到应答
1) "pmessage"
2) "__keyspace@0__:tom*"
3) "__keyspace@0__:tom|1"
4) "hset"
#删除 key
127.0.0.1:6379> DEL tom|1
(integer) 1
127.0.0.1:6379>
1) "pmessage"
2) "__keyspace@0__:tom*"
3) "__keyspace@0__:tom|1"
4) "del"
键空间消息机制实现
class SubscriberStateTable : public ConsumerTableBase
{
public:
SubscriberStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);
/* Get all elements available */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
/* Read keyspace event from redis */
void readData() override;
bool hasCachedData() override;
bool initializedWithData() override
{return !m_buffer.empty();
}
private:
/* Pop keyspace event from event buffer. Caller should free resources. */
std::shared_ptr<RedisReply> popEventBuffer();
std::string m_keyspace;
// 所有应答存储在该队列中
std::deque<std::shared_ptr<RedisReply>> m_keyspace_event_buffer;
Table m_table;// 非常重要的一个成员,具体的
};
class Table
class Table : public TableBase, public TableEntryEnumerable {
public:
Table(DBConnector *db, const std::string &tableName);
Table(RedisPipeline *pipeline, const std::string &tableName, bool buffered);
~Table() override;
/* Set an entry in the DB directly (op not in use) */
virtual void set(const std::string &key,
const std::vector<FieldValueTuple> &values,
const std::string &op = "",
const std::string &prefix = EMPTY_PREFIX);
/* Delete an entry in the table */
virtual void del(const std::string &key,
const std::string &op = "",
const std::string &prefix = EMPTY_PREFIX);
/* Read a value from the DB directly */
/* Returns false if the key doesn't exists */
virtual bool get(const std::string &key, std::vector<FieldValueTuple> &ovalues);
void getKeys(std::vector<std::string> &keys);
void setBuffered(bool buffered);
void flush();
void dump(TableDump &tableDump);
protected:
bool m_buffered;
bool m_pipeowned;
RedisPipeline *m_pipe;
/* Strip special symbols from keys used for type identification
* Input example:
* port@
* DB entry:
* 1) "ports@"
* 2) "Ethernet0,Ethernet4,...
* */
std::string stripSpecialSym(const std::string &key);
};
Table::Table
Table::Table(RedisPipeline *pipeline, const string &tableName, bool buffered)
: TableBase(pipeline->getDbId(), tableName)
, m_buffered(buffered)
, m_pipeowned(false)
, m_pipe(pipeline)
{}
Table::get
bool Table::get(const string &key, vector<FieldValueTuple> &values)
{
/*
127.0.0.1:6379[4]> HGETALL "VLAN|Vlan1000"
1) "vlanid"
2) "1000"
127.0.0.1:6379[4]>
*/
RedisCommand hgetall_key;
hgetall_key.format("HGETALL %s", getKeyName(key).c_str());
RedisReply r = m_pipe->push(hgetall_key, REDIS_REPLY_ARRAY);
redisReply *reply = r.getContext();
values.clear();
if (!reply->elements)
return false;
if (reply->elements & 1)// 必须是偶数, 键值对
throw system_error(make_error_code(errc::address_not_available),
"Unable to connect netlink socket");
// 整理键值对
for (unsigned int i = 0; i < reply->elements; i += 2)
{values.emplace_back(stripSpecialSym(reply->element[i]->str),
reply->element[i + 1]->str);
}
return true;
}
SubscriberStateTable::SubscriberStateTable(……)
SubscriberStateTable::SubscriberStateTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName)
{
// 键空间
m_keyspace = "__keyspace@";
m_keyspace += to_string(db->getDbId()) + "__:" + tableName + m_table.getTableNameSeparator() + "*";
// 订阅键空间事件
psubscribe(m_db, m_keyspace);
vector<string> keys;
m_table.getKeys(keys);
for (const auto &key: keys)
{
KeyOpFieldsValuesTuple kco;
kfvKey(kco) = key;
kfvOp(kco) = SET_COMMAND;
if (!m_table.get(key, kfvFieldsValues(kco)))
{continue;}
m_buffer.push_back(kco);
}
}
readData
该订阅者实现了自己的数据读取函数
void SubscriberStateTable::readData()
{
redisReply *reply = nullptr;
/* Read data from redis. This call is non blocking. This method
* is called from Select framework when data is available in socket.
* NOTE: All data should be stored in event buffer. It won't be possible to
* read them second time. */
if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
{throw std::runtime_error("Unable to read redis reply");
}
// 将应答压入键空间事件缓存中
m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply)));
/* Try to read data from redis cacher.
* If data exists put it to event buffer.
* NOTE: Keyspace event is not persistent and it won't
* be possible to read it second time. If it is not stared in
* the buffer it will be lost. */
// 循环获取所有应答
reply = nullptr;
int status;
do
{status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK)
{m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply)));
}
}
while(reply != nullptr && status == REDIS_OK);
if (status != REDIS_OK)
{throw std::runtime_error("Unable to read redis reply");
}
}
hasCachedData
该类订阅者自己实现了判断是否还有数据,只要大于 1,则认为还有数据,相比之下比默认的接口更优。
bool SubscriberStateTable::hasCachedData()
{return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1;}
pops
该类订阅者,期望的数据即在订阅事件的返回应答中,应答中只是 key 和事件类型。如果不是 del 的话,需要根据具体的事件进行数据库读取。
shared_ptr<RedisReply> SubscriberStateTable::popEventBuffer()
{if (m_keyspace_event_buffer.empty())
{return NULL;}
auto reply = m_keyspace_event_buffer.front();
m_keyspace_event_buffer.pop_front();
return reply;
}
void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)
{vkco.clear();
if (!m_buffer.empty())// 不为空,则将其中的内容拷贝出来
{vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end());
m_buffer.clear();
return;
}
while (auto event = popEventBuffer())// 提取信息
{
KeyOpFieldsValuesTuple kco;
/* if the Key-space notification is empty, try next one. */
if (event->getContext()->type == REDIS_REPLY_NIL)
{continue;}
assert(event->getContext()->type == REDIS_REPLY_ARRAY);
size_t n = event->getContext()->elements;
/* Expecting 4 elements for each keyspace pmessage notification */
// 键空间的应答一般包含四个消息
if (n != 4)
{SWSS_LOG_ERROR("invalid number of elements %lu for pmessage of %s", n, m_keyspace.c_str());
continue;
}
/* The second element should be the original pattern matched */
/* 第二个是命中的模式 */
auto ctx = event->getContext()->element[1];
if (m_keyspace != ctx->str)
{SWSS_LOG_ERROR("invalid pattern %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
continue;
}
// 第三个包含命中的 key,冒号后面就是 key
ctx = event->getContext()->element[2];
string msg(ctx->str);
size_t pos = msg.find(':');
if (pos == msg.npos)
{SWSS_LOG_ERROR("invalid format %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
continue;
}
// 冒号后面就是 key,比如 "VLAN_INTERFACE|Vlan1000|192.168.0.1/27"
string table_entry = msg.substr(pos + 1);
// 获取分割符号, 分隔符前面是表名
pos = table_entry.find(m_table.getTableNameSeparator());
if (pos == table_entry.npos)
{SWSS_LOG_ERROR("invalid key %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
continue;
}
string key = table_entry.substr(pos + 1);
// 最后一个是操作
ctx = event->getContext()->element[3];
if (strcmp("del", ctx->str) == 0)
{kfvKey(kco) = key;
kfvOp(kco) = DEL_COMMAND;
}
else
{ // 执行 get 操作
if (!m_table.get(key, kfvFieldsValues(kco)))
{SWSS_LOG_ERROR("Failed to get content for table key %s", table_entry.c_str());
continue;
}
kfvKey(kco) = key;
kfvOp(kco) = SET_COMMAND;
}
vkco.push_back(kco);
}
m_keyspace_event_buffer.clear();
return;
}
正文完