sonic消息传递机制与架构2

43次阅读

共计 20200 个字符,预计需要花费 51 分钟才能阅读完成。

发布 - 订阅机制实现

基于 redis 的发布 – 订阅,sonic 实现了两套消息传递系统:

  • KEY_SET 消息系统:该机制通过一个 set 集合传递 key,通过 publish 命令通知有新的 key 产生。消费者通过 key 组合成一个 hash 表的 key,用于获取真实的消息,set 不保证顺序。样例如下所示:
 "SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8"        #在集合 INTF_TABLE_KEY_SET 中增加一个 key
 "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global" #在 hash 表 INTF_TABLE:PortChannel1:1.1.1.1/ 8 中添加内容
"HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4"
 "PUBLISH" "INTF_TABLE_CHANNEL" "G"     #通知订阅者频道 INTF_TABLE_CHANNEL 有消息,订阅者根据 INTF_TABLE_组合成 INTF_TABLE_KEY_SET 获取 key,然后根据 key 获取 hash 表_INTF_TABLE:PortChannel1:1.1.1.1/ 8 的内容,如果该内容为空则表示删除操作,否则表示 SET 操作。
  • KEY_VALUE_OP 消息系统:该消息系统采用的是 redis 的 list 进行操作,严格保证操作顺序。一次操作在 LIST 中压入三个值,分别为 key,value,operate。其中的 value 是把一个 hash 表进行 json 编码后形成了一个单一的字符串,所以订阅者得到消息后需要进行解码还原,最后一个是操作类型。
"LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"1.1.1.0/24\",\"switch_id\":\"oid:0x21000000000000\",\"table_id\":\"oid:0x0\",\"vr\":\"oid:0x3000000000043\"}" "[\"SAI_ROUTE_ENTRY_ATTR_PACKET_ACTION\",\"SAI_PACKET_ACTION_FORWARD\",\"SAI_ROUTE_ENTRY_ATTR_NEXT_HOP_ID\",\"oid:0x600000000063a\"]" "Screate"
"PUBLISH" "ASIC_STATE_CHANNEL" "G"  #通知订阅者进行消息处理,循环处理消息,一次必须从链表中拿出三个 key。

消费者基类

class ConsumerTableBase: public TableConsumable, public RedisTransactioner
{
public:
    const int POP_BATCH_SIZE;

    ConsumerTableBase(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    ~ConsumerTableBase() override = default;

    void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX);

    void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX);

protected:

    std::deque<KeyOpFieldsValuesTuple> m_buffer;
};

KEY_SET 消息系统

ProducerStateTable

class TableName_KeySet {
private:
    std::string m_key;
public:
    TableName_KeySet(const std::string &tableName)
        : m_key(tableName + "_KEY_SET")
    { }

    std::string getKeySetName() const { return m_key;}
};

class ProducerStateTable : public TableBase, public TableName_KeySet
{
public:
    ProducerStateTable(DBConnector *db, const std::string &tableName);
    ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
    ~ProducerStateTable();

    void setBuffered(bool buffered);
    /* Implements set() and del() commands using notification messages */
    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = SET_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    virtual void del(const std::string &key,
                     const std::string &op = DEL_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    void flush();

private:
    bool m_buffered;
    bool m_pipeowned;
    RedisPipeline *m_pipe;
    std::string m_shaSet;
    std::string m_shaDel;
    std::string m_shaHmSet;
    std::string m_shaMod;
};
ProducerStateTable::ProducerStateTable(……)
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
    : TableBase(pipeline->getDbId(), tableName)
    , TableName_KeySet(tableName)
    , m_buffered(buffered)
    , m_pipeowned(false)
    , m_pipe(pipeline)
{
    string luaSet =
        "redis.call('SADD', KEYS[2], ARGV[2])\n"
        "for i = 0, #KEYS - 3 do\n"
        "redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
        "end\n"
        "redis.call('PUBLISH', KEYS[1], ARGV[1])\n";
    m_shaSet = m_pipe->loadRedisScript(luaSet);

    string luaDel =
        "redis.call('SADD', KEYS[2], ARGV[2])\n"
        "redis.call('DEL', KEYS[3])\n"
        "redis.call('PUBLISH', KEYS[1], ARGV[1])\n";
    m_shaDel = m_pipe->loadRedisScript(luaDel);
}

构造函数加载了几个 lua 脚本,分别用于 hset,del,hmset,以及用于修改的 hmset。

void ProducerStateTable::set(……)

set 操作封装

void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
                 const string &op /*= SET_COMMAND*/, const string &prefix)
{
    // Assembly redis command args into a string vector
    vector<string> args;
    args.emplace_back("EVALSHA");//
    if(0 == op.compare(HMSET_COMMAND)){args.emplace_back(m_shaHmSet);
    }
    else if(0 == op.compare(MOD_COMMAND))
    {args.emplace_back(m_shaMod);
    }
    else
    {args.emplace_back(m_shaSet);
    }
    // 添加频道名称和 key 名字
    args.emplace_back(to_string(values.size() + 2));
    args.emplace_back(getChannelName());
    args.emplace_back(getKeySetName());

    args.insert(args.end(), values.size(), getKeyName(key));
    // 添加一个 G 作为参数
    args.emplace_back("G");
    args.emplace_back(key);
    for (const auto& iv: values)
    {args.emplace_back(fvField(iv));
        args.emplace_back(fvValue(iv));
    }

    // Transform data structure
    vector<const char *> args1;
    transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) {return s.c_str(); } );

    // Invoke redis command
    // 构造 redis 命令
    RedisCommand command;
    command.formatArgv((int)args1.size(), &args1[0], NULL);
    // 压入 redis 命令
    m_pipe->push(command, REDIS_REPLY_NIL);
    // 如果启用了 pipeline,则积累命令,一同发送,提升传输效率
    if (!m_buffered)
    {m_pipe->flush();
    }
}
void ProducerStateTable::del(……)
void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND*/, const string &prefix)
{
    // Assembly redis command args into a string vector
    vector<string> args;
    args.emplace_back("EVALSHA");
    args.emplace_back(m_shaDel);
    args.emplace_back("3");
    args.emplace_back(getChannelName());
    args.emplace_back(getKeySetName());
    args.emplace_back(getKeyName(key));
    args.emplace_back("G");
    args.emplace_back(key);
    args.emplace_back("''");

    // Transform data structure
    vector<const char *> args1;
    transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) {return s.c_str(); } );

    // Invoke redis command
    RedisCommand command;
    command.formatArgv((int)args1.size(), &args1[0], NULL);
    m_pipe->push(command, REDIS_REPLY_NIL);
    if (!m_buffered)
    {m_pipe->flush();
    }
}

ConsumerStateTable

class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet
{
public:
    ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    /* Get multiple pop elements */
    void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
};
ConsumerStateTable::ConsumerStateTable(……)
ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize, int pri)
    : ConsumerTableBase(db, tableName, popBatchSize, pri)
    , TableName_KeySet(tableName)
{for (;;)
    {RedisReply watch(m_db, "WATCH" + getKeySetName(), REDIS_REPLY_STATUS);
        watch.checkStatusOK();
        multi();
        enqueue(std::string("SCARD") + getKeySetName(), REDIS_REPLY_INTEGER);
        // 订阅频道,精确订阅,订阅的频道为:表名_CHANNEL,例如 ROUTE_TABLE_CHANNEL
        subscribe(m_db, getChannelName());
        bool succ = exec();
        if (succ) break;
    }

    RedisReply r(dequeueReply());
    setQueueLength(r.getReply<long long int>());
}
void ConsumerStateTable::pops(……)
void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
{
    // 使用 redis 的 lua 脚本引擎进行操作,脚本为 consumer_state_table_pops.lua
    static std::string luaScript = loadLuaScript("consumer_state_table_pops.lua");
    // 加载脚本,返回的是脚本的 sha
    static std::string sha = loadRedisScript(m_db, luaScript);
    // 构建脚本命令
    RedisCommand command;
    command.format(
        "EVALSHA %s 2 %s %s: %d''",
        sha.c_str(),
        getKeySetName().c_str(),
        getTableName().c_str(),
        POP_BATCH_SIZE);
    // 执行命令
    RedisReply r(m_db, command);
    auto ctx0 = r.getContext();
    vkco.clear();

    // if the set is empty, return an empty kco object
    if (ctx0->type == REDIS_REPLY_NIL)
    {return;}
    // 处理返回的结果
    assert(ctx0->type == REDIS_REPLY_ARRAY);
    size_t n = ctx0->elements;
    vkco.resize(n);
    for (size_t ie = 0; ie < n; ie++)
    {auto& kco = vkco[ie];
        auto& values = kfvFieldsValues(kco);
        assert(values.empty());

        auto& ctx = ctx0->element[ie];
        assert(ctx->elements == 2);
        assert(ctx->element[0]->type == REDIS_REPLY_STRING);
        std::string key = ctx->element[0]->str;
        kfvKey(kco) = key;

        assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
        auto ctx1 = ctx->element[1];
        for (size_t i = 0; i < ctx1->elements / 2; i++)
        {
            FieldValueTuple e;
            fvField(e) = ctx1->element[i * 2]->str;
            fvValue(e) = ctx1->element[i * 2 + 1]->str;
            values.push_back(e);
        }

        // if there is no field-value pair, the key is already deleted
        // 如果没有对应的域值对,则表示删除,存在则表示添加或者更新,同一使用 SET 命令
        if (values.empty())
        {kfvOp(kco) = DEL_COMMAND;
        }
        else
        {kfvOp(kco) = SET_COMMAND;
        }
    }
}
consumer_state_table_pops.lua

.srcsonic-swss-commoncommonconsumer_state_table_pops.lua

local ret = {}
--- 使用 SPOP 命令从 set 中弹出 keys
local keys = redis.call('SPOP', KEYS[1], ARGV[1])
-- 获取 key 中键的个数
local n = table.getn(keys)
-- 遍历每一个 key
for i = 1, n do
   local key = keys[i]
    -- 使用 HGETALL 命令获取 KEYS[2] .. key
   local values = redis.call('HGETALL', KEYS[2] .. key)
   -- 在 ret table 中添加 key:values 键值对,values 是一个 hash 表的值,多个键值对
   table.insert(ret, {key, values})
end
return ret

ProducerStateTable 和 ConsumerStateTable 是以 app_db 为数据库构建的一个消息发布 – 订阅机制,两者是单向,无应答。生产者是 mgr,xxx_sync,agent。消费者是 orchagent。

KEY_VALUE_OP 消息系统

ProducerTable

class TableName_KeyValueOpQueues {
private:
    std::string m_keyvalueop;
public:
    TableName_KeyValueOpQueues(const std::string &tableName)
        : m_keyvalueop(tableName + "_KEY_VALUE_OP_QUEUE")
    { }

    std::string getKeyValueOpQueueTableName() const { return m_keyvalueop;}
};

class ProducerTable : public TableBase, public TableName_KeyValueOpQueues
{
public:
    ProducerTable(DBConnector *db, const std::string &tableName);
    ProducerTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
    ProducerTable(DBConnector *db, const std::string &tableName, const std::string &dumpFile);
    virtual ~ProducerTable();

    void setBuffered(bool buffered);

    /* Implements set() and del() commands using notification messages */

    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = SET_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    virtual void del(const std::string &key,
                     const std::string &op = DEL_COMMAND,
                     const std::string &prefix = EMPTY_PREFIX);

    void flush();

private:
    /* Disable copy-constructor and operator = */
    ProducerTable(const ProducerTable &other);
    ProducerTable & operator = (const ProducerTable &other);

    std::ofstream m_dumpFile;
    bool m_firstItem = true;
    bool m_buffered;
    bool m_pipeowned;
    RedisPipeline *m_pipe;
    std::string m_shaEnque;

    void enqueueDbChange(const std::string &key, const std::string &value, const std::string &op, const std::string &prefix);
};
ProducerTable::ProducerTable(……)

构造函数,构造一个 lua 脚本 luaEnque,该脚本用于发布消息。

ProducerTable::ProducerTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
    : TableBase(pipeline->getDbId(), tableName)
    , TableName_KeyValueOpQueues(tableName)
    , m_buffered(buffered)
    , m_pipeowned(false)
    , m_pipe(pipeline)
{
    /*
     * KEYS[1] : tableName + "_KEY_VALUE_OP_QUEUE example :ASIC_STATE_KEY_VALUE_OP_QUEUE
     * ARGV[1] : key
     * ARGV[2] : value
     * ARGV[3] : op
     * KEYS[2] : tableName + "_CHANNEL"
     * ARGV[4] : "G"
     */
    string luaEnque =
        "redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);"
        "redis.call('PUBLISH', KEYS[2], ARGV[4]);";

    m_shaEnque = m_pipe->loadRedisScript(luaEnque);
}
void ProducerTable::enqueueDbChange(……)

执行脚本函数:

void ProducerTable::enqueueDbChange(const string &key, const string &value, const string &op, const string& /* prefix */)
{
    RedisCommand command;

    command.format(
        "EVALSHA %s 2 %s %s %s %s %s %s",
        m_shaEnque.c_str(),
        getKeyValueOpQueueTableName().c_str(),
        getChannelName().c_str(),
        key.c_str(),
        value.c_str(),
        op.c_str(),
        "G");

    m_pipe->push(command, REDIS_REPLY_NIL);
}
void ProducerTable::set(……)
void ProducerTable::set(const string &key, const vector<FieldValueTuple> &values, const string &op, const string &prefix)
{if (m_dumpFile.is_open())
    {if (!m_firstItem)
            m_dumpFile << "," << endl;
        else
            m_firstItem = false;

        json j;
        string json_key = getKeyName(key);
        j[json_key] = json::object();
        for (const auto &it : values)
            j[json_key][fvField(it)] = fvValue(it);
        j["OP"] = op;
        m_dumpFile << j.dump(4);
    }
    // 发送 lua 脚本命令
    enqueueDbChange(key, JSon::buildJson(values), "S" + op, prefix);
    // Only buffer continuous "set/set" or "del" operations
    if (!m_buffered || (op != "set" && op != "bulkset"))
    {m_pipe->flush();
    }
}
void ProducerTable::del(……)
void ProducerTable::del(const string &key, const string &op, const string &prefix)
{if (m_dumpFile.is_open())
    {if (!m_firstItem)
            m_dumpFile << "," << endl;
        else
            m_firstItem = false;

        json j;
        string json_key = getKeyName(key);
        j[json_key] = json::object();
        j["OP"] = op;
        m_dumpFile << j.dump(4);
    }

    enqueueDbChange(key, "{}", "D" + op, prefix);
    if (!m_buffered)
    {m_pipe->flush();
    }
}

ConsumerTable

class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueues
{
public:
    ConsumerTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    /* Get multiple pop elements */
    void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);
};
ConsumerTable::ConsumerTable(……)
ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
    : ConsumerTableBase(db, tableName, popBatchSize, pri)
    , TableName_KeyValueOpQueues(tableName)
{for (;;)
    {RedisReply watch(m_db, string("WATCH") + getKeyValueOpQueueTableName(), REDIS_REPLY_STATUS);
        watch.checkStatusOK();
        multi();
        enqueue(string("LLEN") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
        // 订阅频道
        subscribe(m_db, getChannelName());
        enqueue(string("LLEN") + getKeyValueOpQueueTableName(), REDIS_REPLY_INTEGER);
        bool succ = exec();
        if (succ) break;
    }

    RedisReply r(dequeueReply());
    long long int len = r.getReply<long long int>();
    //Key, Value and OP are in one list, they are processed in one shot
    setQueueLength(len/3);
}
void ConsumerTable::pops(……)

// 使用 lua 脚本:consumer_table_pops.lua 从数据库中获取消息

void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &prefix)
{static std::string luaScript = loadLuaScript("consumer_table_pops.lua");

    static string sha = loadRedisScript(m_db, luaScript);
    RedisCommand command;
    command.format(
        "EVALSHA %s 2 %s %s %d''",
        sha.c_str(),
        getKeyValueOpQueueTableName().c_str(),
        (prefix+getTableName()).c_str(),
        POP_BATCH_SIZE);

    RedisReply r(m_db, command, REDIS_REPLY_ARRAY);

    auto ctx0 = r.getContext();
    vkco.clear();

    // if the set is empty, return an empty kco object
    if (r.getContext()->type == REDIS_REPLY_NIL)
    {return;}

    assert(ctx0->type == REDIS_REPLY_ARRAY);
    size_t n = ctx0->elements;
    vkco.resize(n);

    for (size_t ie = 0; ie < n; ie++)
    {auto& kco = vkco[ie];
        auto& values = kfvFieldsValues(kco);
        assert(values.empty());

        auto& ctx = ctx0->element[ie];
        string key = ctx->element[0]->str;
        kfvKey(kco) = key;
        string op  = ctx->element[1]->str;
        kfvOp(kco) = op;

        for (size_t i = 2; i < ctx->elements; i += 2)
        {if (i+1 >= ctx->elements)
            {SWSS_LOG_ERROR("invalid number of elements in returned table: %lu >= %lu", i+1, ctx->elements);
                throw runtime_error("invalid number of elements in returned table");
            }

            FieldValueTuple e;

            fvField(e) = ctx->element[i+0]->str;
            fvValue(e) = ctx->element[i+1]->str;
            values.push_back(e);
        }
    }
}
consumer_table_pops.lua

consumer_table_pops.lua 与 consumer_state_table_pops.lua 最大的不同是。consumer_table_pops.lua 采用的是有序链表,consumer_state_table_pops.lua 用的是 set。consumer_table_pops.lua 对业务顺序比较关心,syncd 没有业务逻辑,只是简单的执行 sai 调用,对硬件的操作是有时序的,必须严格按照时序进行,否则系统无法正常运转,而 orchagent 则是软件层面,它可以等待事件所需条件都满足后才进行动作。另外一点不同的是,对于删除操作,consumer_state_table_pops.lua 不负责删除数据库的内容,而是由生产者负责清除。consumer_table_pops.lua 必须自己清除数据库的删除操作。

local rets = {}
-- pop Key, Value and OP together.
local popsize = ARGV[1] * 3
-- 从尾部开始取数据,严格按照时序进行
local keys   = redis.call('LRANGE', KEYS[1], -popsize, -1)
-- 将提取出来的 key 从 list 中删除
redis.call('LTRIM', KEYS[1], 0, -popsize-1)
-- 获取提取的 key 的个数,key 三个为一组,分别构成消息的 key,value,op,其中 op 的第一个字母为前缀
local n = table.getn(keys)
for i = n, 1, -3 do -- 三组一个处理消息
   local op = keys[i-2]
   local value = keys[i-1]
   local key = keys[i]
   local dbop = op:sub(1,1)
   op = op:sub(2)
   local ret = {key, op}
   -- 解码 value,还原成键值对
   local jj = cjson.decode(value)
   local size = #jj

   for idx=1,size,2 do
       table.insert(ret, jj[idx])
       table.insert(ret, jj[idx+1])
   end
   table.insert(rets, ret)

   if op == 'bulkset' or op == 'bulkcreate' then

-- key is "OBJECT_TYPE:num", extract object type from key
       key = key:sub(1, string.find(key, ':') - 1)

       local len = #ret
       local st = 3         -- since 1 and 2 is key/op
       while st <= len do
           local field = ret[st]
-- keyname is ASIC_STATE : OBJECT_TYPE : OBJECT_ID
           local keyname = KEYS[2] .. ':' .. key .. ':' .. field

-- value can be multiple a=v|a=v|... we need to split using gmatch
           local vars = ret[st+1]
           for value in string.gmatch(vars,'([^|]+)') do
               local attr = value:sub(1, string.find(value, '=') - 1)
               local val = value.sub(value, string.find(value, '=') + 1)
               redis.call('HSET', keyname, attr, val)
           end

           st = st + 2
       end

   elseif op ~= 'flush' and op ~= 'flushresponse' and op ~= 'get' and op ~= 'getresponse' and op ~= 'notify' then
       local keyname = KEYS[2] .. ':' .. key
       if key == '' then
           keyname = KEYS[2]
       end
       -- 删除命令,还要负责删除数据库,这就是在数据库中为什么我们看不到信息了
       if dbop == 'D' then
           redis.call('DEL', keyname)
       else -- 对于添加,还需要将解码后的键值对,重新以 hash 表的方式插回去,这就是我们在 redis 数据库中看到的是 hash
           local st = 3
           local len = #ret
           while st <= len do
               redis.call('HSET', keyname, ret[st], ret[st+1])
               st = st + 2
           end
       end
   end
end

return rets

ProducerTable 和 ConsumerTable 用于 orchagent 和 syncd 之间的生产者 – 消费者工具,以 asic_db 为数据库。

添加实例

agent 写入 app-db
[0 unix:/var/run/redis/redis.sock] "EVALSHA" "433b5d51dc97a94f3b084255db05473699f3873a" "12" "VRF_TABLE_CHANNEL" "VRF_TABLE_KEY_SET" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "VRF_TABLE:router-3-2" "G" "router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr"
[0 lua] "SADD" "VRF_TABLE_KEY_SET" "router-3-2"
[0 lua] "HMSET" "VRF_TABLE:router-3-2" "id" "router-3-2" "vni" "2001" "data_local_ip" "10.8.8.200" "data_local_oif" "PortChannel1" "vr_remote_ip" "12.0.1.87" "vr_local_ip" "10.8.8.200" "vr_local_oif" "PortChannel1" "relay_ip" "169.0.1.201" "relay_mac" "6c:ae:8b:52:d8:66" "device_type" "vr"
[0 lua] "PUBLISH" "VRF_TABLE_CHANNEL" "G"

orchagent 从 app-db 读出数据
[0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "VRF_TABLE_KEY_SET" "VRF_TABLE:" "8192" "''"[0 lua]"SPOP""VRF_TABLE_KEY_SET" "8192"
[0 lua] "HGETALL" "VRF_TABLE:router-3-2"
[0 lua] "HGETALL" "VRF_TABLE:router-3-1"

#orchagent 写 asic-db,value 会进行编码
[1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate" "G"
[1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "[\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE\",\"true\",\"SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE\",\"false\",\"SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS\",\"6C:AE:8B:52:D8:66\"]" "Screate"
[1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G"

#syncd 监听后,调用脚本进行解析
[1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''"[1 lua]"LRANGE""ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1"
[1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385"
#decode 后重新写回数据库
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V4_STATE" "true"
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_ADMIN_V6_STATE" "false"
[1 lua] "HSET" "ASIC_STATE:SAI_OBJECT_TYPE_VIRTUAL_ROUTER:oid:0x30000000006b6" "SAI_VIRTUAL_ROUTER_ATTR_SRC_MAC_ADDRESS" "6C:AE:8B:52:D8:66"

删除实例

# 我们以如下数据为例进行说明:"SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24"
"DEL" "ROUTE_TABLE:10.254.31.0/24"
"PUBLISH" "ROUTE_TABLE_CHANNEL" "G"

#在数据库 app_db 中写入删除路由表项 10.254.31.0/24 数据
1543393647.059780 [0 127.0.0.1:33859] "SADD" "ROUTE_TABLE_KEY_SET" "10.254.31.0/24"
1543393647.066503 [0 127.0.0.1:33859] "DEL" "ROUTE_TABLE:10.254.31.0/24"
1543393647.090635 [0 127.0.0.1:33859] "PUBLISH" "ROUTE_TABLE_CHANNEL" "G"

#orchagent 作为客户端通过订阅频道 ROUTE_TABLE_CHANNEL,服务器通知了 orchagent,执行脚本 consumer_state_table_pops.lua  
#计算 sha1sum consumer_state_table_pops.lua dc95f6831ed81200867b02a0ff2f8a25db9d6540,可见执行的是 lua 脚本 consumer_state_table_pops.lua
#该脚本循环取出所有 key 的 hash 属性,一次最多取出 8192 个 key
1543393647.090864 [0 unix:/var/run/redis/redis.sock] "EVALSHA" "dc95f6831ed81200867b02a0ff2f8a25db9d6540" "2" "ROUTE_TABLE_KEY_SET" "ROUTE_TABLE:" "8192" "''"1543393647.090934 [0 lua]"SPOP""ROUTE_TABLE_KEY_SET" "8192"
1543393647.090997 [0 lua] "HGETALL" "ROUTE_TABLE:10.254.31.0/24"

#orchagent 获取内容后,将 hash 表数据进行序列化,即将 hash 类型转换为列表类型,作为生产者调用脚本
#"redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);"
#"redis.call('PUBLISH', KEYS[2], ARGV[4]);"
#将内容写入数据库 1(asic-db))
1543393647.091399 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "d171e04fd79e95ca2287f3b067c46ae76a82208b" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE_CHANNEL" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove" "G"
1543393647.091510 [1 lua] "LPUSH" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}" "{}" "Dremove"
1543393647.091591 [1 lua] "PUBLISH" "ASIC_STATE_CHANNEL" "G"

#syncd 订阅了 ASIC_STATE_CHANNEL 频道,得到了通知,执行脚本 consumer_table_pops.lua
#sha1sum usr/share/swss/consumer_table_pops.lua
#22e4ca16cf37e220c92df33903a0e95585ab76fd  usr/share/swss/consumer_table_pops.lua
1543393647.091734 [1 unix:/var/run/redis/redis.sock] "EVALSHA" "22e4ca16cf37e220c92df33903a0e95585ab76fd" "2" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "ASIC_STATE" "128" "''"1543393647.091856 [1 lua]"LRANGE""ASIC_STATE_KEY_VALUE_OP_QUEUE" "-384" "-1"
1543393647.091889 [1 lua] "LTRIM" "ASIC_STATE_KEY_VALUE_OP_QUEUE" "0" "-385"
1543393647.091950 [1 lua] "DEL" "ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY:{\"dest\":\"10.254.31.0/24\",\"rif_id\":\"oid:0x0\",\"switch_id\":\"oid:0x21000000000000\",\"vr\":\"oid:0x3000000000043\"}"

正文完
 0