共计 20200 个字符,预计需要花费 51 分钟才能阅读完成。
发布 - 订阅机制实现
基于 redis 的发布 – 订阅,sonic 实现了两套消息传递系统:
- KEY_SET 消息系统:该机制通过一个 set 集合传递 key,通过 publish 命令通知有新的 key 产生。消费者通过 key 组合成一个 hash 表的 key,用于获取真实的消息,set 不保证顺序。样例如下所示:
"SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8" #在集合 INTF_TABLE_KEY_SET 中增加一个 key
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" #在 hash 表 INTF_TABLE:PortChannel1:1.1.1.1/ 8 中添加内容
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4"
"PUBLISH" "INTF_TABLE_CHANNEL" "G" #通知订阅者频道 INTF_TABLE_CHANNEL 有消息,订阅者根据 INTF_TABLE_组合成 INTF_TABLE_KEY_SET 获取 key,然后根据 key 获取 hash 表_INTF_TABLE:PortChannel1:1.1.1.1/ 8 的内容,如果该内容为空则表示删除操作,否则表示 SET 操作。
- KEY_VALUE_OP 消息系统:该消息系统采用的是 redis 的 list 进行操作,严格保证操作顺序。一次操作在 LIST 中压入三个值,分别为 key,value,operate。其中的 value 是把一个 hash 表进行 json 编码后形成了一个单一的字符串,所以订阅者得到消息后需要进行解码还原,最后一个是操作类型。
"LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"1.1.1.0/24\",\"switch_id\":\"oid:0x21000000000000\",\"table_id\":\"oid:0x0\",\"vr\":\"oid:0x3000000000043\"}" "[\"SAI_ROUTE_ENTRY_ATTR_PACKET_ACTION\",\"SAI_PACKET_ACTION_FORWARD\",\"SAI_ROUTE_ENTRY_ATTR_NEXT_HOP_ID\",\"oid:0x600000000063a\"]" "Screate"
"PUBLISH" "ASIC_STATE_CHANNEL" "G" #通知订阅者进行消息处理,循环处理消息,一次必须从链表中拿出三个 key。
消费者基类
class ConsumerTableBase: public TableConsumable, public RedisTransactioner
{
public:
const int POP_BATCH_SIZE;
ConsumerTableBase(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);
~ConsumerTableBase() override = default;
void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX);
void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX);
protected:
std::deque<KeyOpFieldsValuesTuple> m_buffer;
};
KEY_SET 消息系统
ProducerStateTable
class TableName_KeySet {
private:
std::string m_key;
public:
TableName_KeySet(const std::string &tableName)
: m_key(tableName + "_KEY_SET")
{ }
std::string getKeySetName() const { return m_key;}
};
class ProducerStateTable : public TableBase, public TableName_KeySet
{
public:
ProducerStateTable(DBConnector *db, const std::string &tableName);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
~ProducerStateTable();
void setBuffered(bool buffered);
/* Implements set() and del() commands using notification messages */
virtual void set(const std::string &key,
const std::vector<FieldValueTuple> &values,
const std::string &op = SET_COMMAND,
const std::string &prefix = EMPTY_PREFIX);
virtual void del(const std::string &key,
const std::string &op = DEL_COMMAND,
const std::string &prefix = EMPTY_PREFIX);
void flush();
private:
bool m_buffered;
bool m_pipeowned;
RedisPipeline *m_pipe;
std::string m_shaSet;
std::string m_shaDel;
std::string m_shaHmSet;
std::string m_shaMod;
};
ProducerStateTable::ProducerStateTable(……)
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
: TableBase(pipeline->getDbId(), tableName)
, TableName_KeySet(tableName)
, m_buffered(buffered)
, m_pipeowned(false)
, m_pipe(pipeline)
{
string luaSet =
"redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
"redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n"
"redis.call('PUBLISH', KEYS[1], ARGV[1])\n";
m_shaSet = m_pipe->loadRedisScript(luaSet);
string luaDel =
"redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n"
"redis.call('PUBLISH', KEYS[1], ARGV[1])\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);
}
构造函数加载了几个 lua 脚本,分别用于 hset,del,hmset,以及用于修改的 hmset。
void ProducerStateTable::set(……)
set 操作封装
void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
const string &op /*= SET_COMMAND*/, const string &prefix)
{
// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");//
if(0 == op.compare(HMSET_COMMAND)){args.emplace_back(m_shaHmSet);
}
else if(0 == op.compare(MOD_COMMAND))
{args.emplace_back(m_shaMod);
}
else
{args.emplace_back(m_shaSet);
}
// 添加频道名称和 key 名字
args.emplace_back(to_string(values.size() + 2));
args.emplace_back(getChannelName());
args.emplace_back(getKeySetName());
args.insert(args.end(), values.size(), getKeyName(key));
// 添加一个 G 作为参数
args.emplace_back("G");
args.emplace_back(key);
for (const auto& iv: values)
{args.emplace_back(fvField(iv));
args.emplace_back(fvValue(iv));
}
// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) {return s.c_str(); } );
// Invoke redis command
// 构造 redis 命令
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
// 压入 redis 命令
m_pipe->push(command, REDIS_REPLY_NIL);
// 如果启用了 pipeline,则积累命令,一同发送,提升传输效率
if (!m_buffered)
{m_pipe->flush();
}
}
void ProducerStateTable::del(……)
void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix)
{
// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaDel);
args.emplace_back("3");
args.emplace_back(getChannelName());
args.emplace_back(getKeySetName());
args.emplace_back(getKeyName(key));
args.emplace_back("G");
args.emplace_back(key);
args.emplace_back("''");
// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) {return s.c_str(); } );
// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{m_pipe->flush();
}
}
ConsumerStateTable
class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet
{
public:
ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);
/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
};
ConsumerStateTable::ConsumerStateTable(……)
ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeySet(tableName)
{for (;;)
{RedisReply watch(m_db, "WATCH" + getKeySetName(), REDIS_REPLY_STATUS);
watch.checkStatusOK();
multi();
enqueue(std::string("SCARD") + getKeySetName(), REDIS_REPLY_INTEGER);
// 订阅频道,精确订阅,订阅的频道为:表名_CHANNEL,例如 ROUTE_TABLE_CHANNEL
subscribe(m_db, getChannelName());
bool succ = exec();
if (succ) break;
}
RedisReply r(dequeueReply());
setQueueLength(r.getReply<long long int>());
}
void ConsumerStateTable::pops(……)
void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
{
// 使用 redis 的 lua 脚本引擎进行操作,脚本为 consumer_state_table_pops.lua
static std::string luaScript = loadLuaScript("consumer_state_table_pops.lua");
// 加载脚本,返回的是脚本的 sha
static std::string sha = loadRedisScript(m_db, luaScript);
// 构建脚本命令
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %s: %d''",
sha.c_str(),
getKeySetName().c_str(),
getTableName().c_str(),
POP_BATCH_SIZE);
// 执行命令
RedisReply r(m_db, command);
auto ctx0 = r.getContext();
vkco.clear();
// if the set is empty, return an empty kco object
if (ctx0->type == REDIS_REPLY_NIL)
{return;}
// 处理返回的结果
assert(ctx0->type == REDIS_REPLY_ARRAY);
size_t n = ctx0->elements;
vkco.resize(n);
for (size_t ie = 0; ie < n; ie++)
{auto& kco = vkco[ie];
auto& values = kfvFieldsValues(kco);
assert(values.empty());
auto& ctx = ctx0->element[ie];
assert(ctx->elements == 2);
assert(ctx->element[0]->type == REDIS_REPLY_STRING);
std::string key = ctx->element[0]->str;
kfvKey(kco) = key;
assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
auto ctx1 = ctx->element[1];
for (size_t i = 0; i < ctx1->elements / 2; i++)
{
FieldValueTuple e;
fvField(e) = ctx1->element[i * 2]->str;
fvValue(e) = ctx1->element[i * 2 + 1]->str;
values.push_back(e);
}
// if there is no field-value pair, the key is already deleted
// 如果没有对应的域值对,则表示删除,存在则表示添加或者更新,同一使用 SET 命令
if (values.empty())
{kfvOp(kco) = DEL_COMMAND;
}
else
{kfvOp(kco) = SET_COMMAND;
}
}
}
consumer_state_table_pops.lua
.srcsonic-swss-commoncommonconsumer_state_table_pops.lua
local ret = {}
--- 使用 SPOP 命令从 set 中弹出 keys
local keys = redis.call('SPOP', KEYS[1], ARGV[1])
-- 获取 key 中键的个数
local n = table.getn(keys)
-- 遍历每一个 key
for i = 1, n do
local key = keys[i]
-- 使用 HGETALL 命令获取 KEYS[2] .. key
local values = redis.call('HGETALL', KEYS[2] .. key)
-- 在 ret table 中添加 key:values 键值对,values 是一个 hash 表的值,多个键值对
table.insert(ret, {key, values})
end
return ret
ProducerStateTable 和 ConsumerStateTable 是以 app_db 为数据库构建的一个消息发布 – 订阅机制,两者是单向,无应答。生产者是 mgr,xxx_sync,agent。消费者是 orchagent。
KEY_VALUE_OP 消息系统
ProducerTable
class TableName_KeyValueOpQueues {
private:
std::string m_keyvalueop;
public:
TableName_KeyValueOpQueues(const std::string &tableName)
: m_keyvalueop(tableName + "_KEY_VALUE_OP_QUEUE")
{ }
std::string getKeyValueOpQueueTableName() const { return m_keyvalueop;}
};
class ProducerTable : public TableBase, public TableName_KeyValueOpQueues
{
public:
ProducerTable(DBConnector *db, const std::string &tableName);
ProducerTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
ProducerTable(DBConnector *db, const std::string &tableName, const std::string &dumpFile);
virtual ~ProducerTable();
void setBuffered(bool buffered);
/* Implements set() and del() commands using notification messages */
virtual void set(const std::string &key,
const std::vector<FieldValueTuple> &values,
const std::string &op = SET_COMMAND,
const std::string &prefix = EMPTY_PREFIX);
virtual void del(const std::string &key,
const std::string &op = DEL_COMMAND,
const std::string &prefix = EMPTY_PREFIX);
void flush();
private:
/* Disable copy-constructor and operator = */
ProducerTable(const ProducerTable &other);
ProducerTable & operator = (const ProducerTable &other);
std::ofstream m_dumpFile;
bool m_firstItem = true;
bool m_buffered;
bool m_pipeowned;
RedisPipeline *m_pipe;
std::string m_shaEnque;
void enqueueDbChange(const std::string &key, const std::string &value, const std::string &op, const std::string &prefix);
};
ProducerTable::ProducerTable(……)
构造函数,构造一个 lua 脚本 luaEnque,该脚本用于发布消息。
ProducerTable::ProducerTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
: TableBase(pipeline->getDbId(), tableName)
, TableName_KeyValueOpQueues(tableName)
, m_buffered(buffered)
, m_pipeowned(false)
, m_pipe(pipeline)
{
/*
* KEYS[1] : tableName + "_KEY_VALUE_OP_QUEUE example :ASIC_STATE_KEY_VALUE_OP_QUEUE
* ARGV[1] : key
* ARGV[2] : value
* ARGV[3] : op
* KEYS[2] : tableName + "_CHANNEL"
* ARGV[4] : "G"
*/
string luaEnque =
"redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);"
"redis.call('PUBLISH', KEYS[2], ARGV[4]);";
m_shaEnque = m_pipe->loadRedisScript(luaEnque);
}
void ProducerTable::enqueueDbChange(……)
执行脚本函数:
void ProducerTable::enqueueDbChange(const string &key, const string &value, const string &op, const string& /* prefix */)
{
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %s %s %s %s %s",
m_shaEnque.c_str(),
getKeyValueOpQueueTableName().c_str(),
getChannelName().c_str(),
key.c_str(),
value.c_str(),
op.c_str(),
"G");
m_pipe->push(command, REDIS_REPLY_NIL);
}
void ProducerTable::set(……)
void ProducerTable::set(const string &key, const vector<FieldValueTuple> &values, const string &op, const string &prefix)
{if (m_dumpFile.is_open())
{if (!m_firstItem)
m_dumpFile << "," << endl;
else
m_firstItem = false;
json j;
string json_key = getKeyName(key);
j[json_key] = json::object();
for (const auto &it : values)
j[json_key][fvField(it)] = fvValue(it);
j["OP"] = op;
m_dumpFile << j.dump(4);
}
// 发送 lua 脚本命令
enqueueDbChange(key, JSon::buildJson(values), "S" + op, prefix);
// Only buffer continuous "set/set" or "del" operations
if (!m_buffered || (op != "set" && op != "bulkset"))
{m_pipe->flush();
}
}
void ProducerTable::del(……)
void ProducerTable::del(const string &key, const string &op, const string &prefix)
{if (m_dumpFile.is_open())
{if (!m_firstItem)
m_dumpFile << "," << endl;
else
m_firstItem = false;
json j;
string json_key = getKeyName(key);
j[json_key] = json::object();
j["OP"] = op;
m_dumpFile << j.dump(4);
}
enqueueDbChange(key, "{}", "D" + op, prefix);
if (!m_buffered)
{m_pipe->flush();
}
}
ConsumerTable
class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueues
{
public:
ConsumerTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);
/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
};
ConsumerTable::ConsumerTable(……)
ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeyValueOpQueues(tableName)
{for (;;)
{RedisReply watch(m_db, string("WATCH") + getKeyValueOpQueueTableName(), REDIS_REPLY_STATUS);
watch.checkStatusOK();
multi();
enqueue(string("LLEN") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
// 订阅频道
subscribe(m_db, getChannelName());
enqueue(string("LLEN") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
bool succ = exec();
if (succ) break;
}
RedisReply r(dequeueReply());
long long int len = r.getReply<long long int>();
//Key, Value and OP are in one list, they are processed in one shot
setQueueLength(len/3);
}
void ConsumerTable::pops(……)
// 使用 lua 脚本:consumer_table_pops.lua 从数据库中获取消息
void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &prefix)
{static std::string luaScript = loadLuaScript("consumer_table_pops.lua");
static string sha = loadRedisScript(m_db, luaScript);
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %s %d''",
sha.c_str(),
getKeyValueOpQueueTableName().c_str(),
(prefix+getTableName()).c_str(),
POP_BATCH_SIZE);
RedisReply r(m_db, command, REDIS_REPLY_ARRAY);
auto ctx0 = r.getContext();
vkco.clear();
// if the set is empty, return an empty kco object
if (r.getContext()->type == REDIS_REPLY_NIL)
{return;}
assert(ctx0->type == REDIS_REPLY_ARRAY);
size_t n = ctx0->elements;
vkco.resize(n);
for (size_t ie = 0; ie < n; ie++)
{auto& kco = vkco[ie];
auto& values = kfvFieldsValues(kco);
assert(values.empty());
auto& ctx = ctx0->element[ie];
string key = ctx->element[0]->str;
kfvKey(kco) = key;
string op = ctx->element[1]->str;
kfvOp(kco) = op;
for (size_t i = 2; i < ctx->elements; i += 2)
{if (i+1 >= ctx->elements)
{SWSS_LOG_ERROR("invalid number of elements in returned table: %lu >= %lu", i+1, ctx->elements);
throw runtime_error("invalid number of elements in returned table");
}
FieldValueTuple e;
fvField(e) = ctx->element[i+0]->str;
fvValue(e) = ctx->element[i+1]->str;
values.push_back(e);
}
}
}
consumer_table_pops.lua
consumer_table_pops.lua 与 consumer_state_table_pops.lua 最大的不同是。consumer_table_pops.lua 采用的是有序链表,consumer_state_table_pops.lua 用的是 set。consumer_table_pops.lua 对业务顺序比较关心,syncd 没有业务逻辑,只是简单的执行 sai 调用,对硬件的操作是有时序的,必须严格按照时序进行,否则系统无法正常运转,而 orchagent 则是软件层面,它可以等待事件所需条件都满足后才进行动作。另外一点不同的是,对于删除操作,consumer_state_table_pops.lua 不负责删除数据库的内容,而是由生产者负责清除。consumer_table_pops.lua 必须自己清除数据库的删除操作。
local rets = {}
-- pop Key, Value and OP together.
local popsize = ARGV[1] * 3
-- 从尾部开始取数据,严格按照时序进行
local keys = redis.call('LRANGE', KEYS[1], -popsize, -1)
-- 将提取出来的 key 从 list 中删除
redis.call('LTRIM', KEYS[1], 0, -popsize-1)
-- 获取提取的 key 的个数,key 三个为一组,分别构成消息的 key,value,op,其中 op 的第一个字母为前缀
local n = table.getn(keys)
for i = n, 1, -3 do -- 三组一个处理消息
local op = keys[i-2]
local value = keys[i-1]
local key = keys[i]
local dbop = op:sub(1,1)
op = op:sub(2)
local ret = {key, op}
-- 解码 value,还原成键值对
local jj = cjson.decode(value)
local size = #jj
for idx=1,size,2 do
table.insert(ret, jj[idx])
table.insert(ret, jj[idx+1])
end
table.insert(rets, ret)
if op == 'bulkset' or op == 'bulkcreate' then
-- key is "OBJECT_TYPE:num", extract object type from key
key = key:sub(1, string.find(key, ':') - 1)
local len = #ret
local st = 3 -- since 1 and 2 is key/op
while st <= len do
local field = ret[st]
-- keyname is ASIC_STATE : OBJECT_TYPE : OBJECT_ID
local keyname = KEYS[2] .. ':' .. key .. ':' .. field
-- value can be multiple a=v|a=v|... we need to split using gmatch
local vars = ret[st+1]
for value in string.gmatch(vars,'([^|]+)') do
local attr = value:sub(1, string.find(value, '=') - 1)
local val = value.sub(value, string.find(value, '=') + 1)
redis.call('HSET', keyname, attr, val)
end
st = st + 2
end
elseif op ~= 'flush' and op ~= 'flushresponse' and op ~= 'get' and op ~= 'getresponse' and op ~= 'notify' then
local keyname = KEYS[2] .. ':' .. key
if key == '' then
keyname = KEYS[2]
end
-- 删除命令,还要负责删除数据库,这就是在数据库中为什么我们看不到信息了
if dbop == 'D' then
redis.call('DEL', keyname)
else -- 对于添加,还需要将解码后的键值对,重新以 hash 表的方式插回去,这就是我们在 redis 数据库中看到的是 hash
local st = 3
local len = #ret
while st <= len do
redis.call('HSET', keyname, ret[st], ret[st+1])
st = st + 2
end
end
end
end
return rets
ProducerTable 和 ConsumerTable 用于 orchagent 和 syncd 之间的生产者 – 消费者工具,以 asic_db 为数据库。
添加实例
agent 写入 app-db
[0 unix:/var/run/redis/redis.sock] "EVALSHA" "433b5d51dc97a94f3b084255db05473699f3873a" "12" "VRF_TABLE_CHANNEL" "VRF_TABLE_KEY_SET" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "G" "router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr"
[0 lua] "SADD" "VRF_TABLE_KEY_SET" "router-3-2"
[0 lua] "HMSET" "VRF_TABLE:router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr"
[0 lua] "PUBLISH" "VRF_TABLE_CHANNEL" "G"
orchagent 从 app-db 读出数据
[0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "VRF_TABLE_KEY_SET" "VRF_TABLE:" "8192" "''"[0 lua]"SPOP""VRF_TABLE_KEY_SET" "8192"
[0 lua] "HGETALL" "VRF_TABLE:router-3-2"
[0 lua] "HGETALL" "VRF_TABLE:router-3-1"
#orchagent 写 asic-db,value 会进行编码
[1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate" "G"
[1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate"
[1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G"
#syncd 监听后,调用脚本进行解析
[1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''"[1 lua]"LRANGE""ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1"
[1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385"
#decode 后重新写回数据库
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE" "true"
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE" "false"
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS" "6C:AE:8B:52:D8:66"
删除实例
# 我们以如下数据为例进行说明:"SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24"
"DEL" "ROUTE_TABLE:10.254.31.0/24"
"PUBLISH" "ROUTE_TABLE_CHANNEL" "G"
#在数据库 app_db 中写入删除路由表项 10.254.31.0/24 数据
1543393647.059780 [0 127.0.0.1:33859] "SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24"
1543393647.066503 [0 127.0.0.1:33859] "DEL" "ROUTE_TABLE:10.254.31.0/24"
1543393647.090635 [0 127.0.0.1:33859] "PUBLISH" "ROUTE_TABLE_CHANNEL" "G"
#orchagent 作为客户端通过订阅频道 ROUTE_TABLE_CHANNEL,服务器通知了 orchagent,执行脚本 consumer_state_table_pops.lua
#计算 sha1sum consumer_state_table_pops.lua dc95f6831ed81200867b02a0ff2f8a25db9d6540,可见执行的是 lua 脚本 consumer_state_table_pops.lua
#该脚本循环取出所有 key 的 hash 属性,一次最多取出 8192 个 key
1543393647.090864 [0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "ROUTE_TABLE_KEY_SET" "ROUTE_TABLE:" "8192" "''"1543393647.090934 [0 lua]"SPOP""ROUTE_TABLE_KEY_SET" "8192"
1543393647.090997 [0 lua] "HGETALL" "ROUTE_TABLE:10.254.31.0/24"
#orchagent 获取内容后,将 hash 表数据进行序列化,即将 hash 类型转换为列表类型,作为生产者调用脚本
#"redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);"
#"redis.call('PUBLISH', KEYS[2], ARGV[4]);"
#将内容写入数据库 1(asic-db))
1543393647.091399 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove" "G"
1543393647.091510 [1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove"
1543393647.091591 [1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G"
#syncd 订阅了 ASIC_STATE_CHANNEL 频道,得到了通知,执行脚本 consumer_table_pops.lua
#sha1sum usr/share/swss/consumer_table_pops.lua
#22e4ca16cf37e220c92df33903a0e95585ab76fd usr/share/swss/consumer_table_pops.lua
1543393647.091734 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''"1543393647.091856 [1 lua]"LRANGE""ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1"
1543393647.091889 [1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385"
1543393647.091950 [1 lua] "DEL" "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}"