sonic消息传递机制与架构3

21次阅读

共计 7028 个字符,预计需要花费 18 分钟才能阅读完成。

本章节主要分析 sonic 使用 redis 的键空间消息机制实现的消息传递框架,该机制区别于发布 - 订阅机制在于发布者不需要进行 pubulish 通知,只要往数据库中写入指定的键,redis 就会通知监听了该键空间的客户端。该机制目前只用于监听 config_db,用于监听 config 的变化。然后将其同步到 app_db。使用该机制的案例有:VlanMgr,IntfMgr,portsyncd 等,可以通过 orch 包装使用,比如 VlanMgr;也可以直接定义 SubscriberStateTable 表,比如 portCfg。

redis 键空间事件机制样例

# 在数据库 0 中订阅以 tom 开头的键
127.0.0.1:6379> PSUBSCRIBE __keyspace@0__:tom*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyspace@0__:tom*"
3) (integer) 1

#在数据库 0 中添加 hash 表 tom
127.0.0.1:6379> HMSET tom|1 name tom age 28 
OK

#订阅者得到应答
1) "pmessage"
2) "__keyspace@0__:tom*"
3) "__keyspace@0__:tom|1"
4) "hset"

#删除 key
127.0.0.1:6379> DEL tom|1
(integer) 1
127.0.0.1:6379> 

1) "pmessage"
2) "__keyspace@0__:tom*"
3) "__keyspace@0__:tom|1"
4) "del"

键空间消息机制实现

class SubscriberStateTable : public ConsumerTableBase
{
public:
    SubscriberStateTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0);

    /* Get all elements available */
    void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);

    /* Read keyspace event from redis */
    void readData() override;
    bool hasCachedData() override;
    bool initializedWithData() override
    {return !m_buffer.empty();
    }

private:
    /* Pop keyspace event from event buffer. Caller should free resources. */
    std::shared_ptr<RedisReply> popEventBuffer();

    std::string m_keyspace;
    // 所有应答存储在该队列中
    std::deque<std::shared_ptr<RedisReply>> m_keyspace_event_buffer;
    Table m_table;// 非常重要的一个成员,具体的
};

class Table

class Table : public TableBase, public TableEntryEnumerable {
public:
    Table(DBConnector *db, const std::string &tableName);
    Table(RedisPipeline *pipeline, const std::string &tableName, bool buffered);
    ~Table() override;

    /* Set an entry in the DB directly (op not in use) */
    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX);
    /* Delete an entry in the table */
    virtual void del(const std::string &key,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX);

    /* Read a value from the DB directly */
    /* Returns false if the key doesn't exists */
    virtual bool get(const std::string &key, std::vector<FieldValueTuple> &ovalues);

    void getKeys(std::vector<std::string> &keys);

    void setBuffered(bool buffered);

    void flush();

    void dump(TableDump &tableDump);

protected:

    bool m_buffered;
    bool m_pipeowned;
    RedisPipeline *m_pipe;

    /* Strip special symbols from keys used for type identification
     * Input example:
     *     port@
     * DB entry:
     * 1) "ports@"
     * 2) "Ethernet0,Ethernet4,...
     * */
    std::string stripSpecialSym(const std::string &key);
};

Table::Table

Table::Table(RedisPipeline *pipeline, const string &tableName, bool buffered)
    : TableBase(pipeline->getDbId(), tableName)
    , m_buffered(buffered)
    , m_pipeowned(false)
    , m_pipe(pipeline)
{}

Table::get

bool Table::get(const string &key, vector<FieldValueTuple> &values)
{
    /*
        127.0.0.1:6379[4]> HGETALL "VLAN|Vlan1000"
        1) "vlanid"
        2) "1000"
        127.0.0.1:6379[4]> 
    */
    RedisCommand hgetall_key;
    hgetall_key.format("HGETALL %s", getKeyName(key).c_str());
    RedisReply r = m_pipe->push(hgetall_key, REDIS_REPLY_ARRAY);
    redisReply *reply = r.getContext();
    values.clear();

    if (!reply->elements)
        return false;

    if (reply->elements & 1)// 必须是偶数, 键值对
        throw system_error(make_error_code(errc::address_not_available),
                           "Unable to connect netlink socket");
    // 整理键值对
    for (unsigned int i = 0; i < reply->elements; i += 2)
    {values.emplace_back(stripSpecialSym(reply->element[i]->str),
                                    reply->element[i + 1]->str);
    }

    return true;
}

SubscriberStateTable::SubscriberStateTable(……)

SubscriberStateTable::SubscriberStateTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
    : ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName)
{
    // 键空间
    m_keyspace = "__keyspace@";

    m_keyspace += to_string(db->getDbId()) + "__:" + tableName + m_table.getTableNameSeparator() + "*";
    // 订阅键空间事件
    psubscribe(m_db, m_keyspace);

    vector<string> keys;
    m_table.getKeys(keys);

    for (const auto &key: keys)
    {
        KeyOpFieldsValuesTuple kco;

        kfvKey(kco) = key;
        kfvOp(kco) = SET_COMMAND;

        if (!m_table.get(key, kfvFieldsValues(kco)))
        {continue;}

        m_buffer.push_back(kco);
    }
}

readData

该订阅者实现了自己的数据读取函数

void SubscriberStateTable::readData()
{
    redisReply *reply = nullptr;

    /* Read data from redis. This call is non blocking. This method
     * is called from Select framework when data is available in socket.
     * NOTE: All data should be stored in event buffer. It won't be possible to
     * read them second time. */
    if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
    {throw std::runtime_error("Unable to read redis reply");
    }
    // 将应答压入键空间事件缓存中
    m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply)));

    /* Try to read data from redis cacher.
     * If data exists put it to event buffer.
     * NOTE: Keyspace event is not persistent and it won't
     * be possible to read it second time. If it is not stared in
     * the buffer it will be lost. */
    // 循环获取所有应答
    reply = nullptr;
    int status;
    do
    {status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
        if(reply != nullptr && status == REDIS_OK)
        {m_keyspace_event_buffer.push_back(shared_ptr<RedisReply>(make_shared<RedisReply>(reply)));
        }
    }
    while(reply != nullptr && status == REDIS_OK);

    if (status != REDIS_OK)
    {throw std::runtime_error("Unable to read redis reply");
    }
}

hasCachedData

该类订阅者自己实现了判断是否还有数据,只要大于 1,则认为还有数据,相比之下比默认的接口更优。

bool SubscriberStateTable::hasCachedData()
{return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1;}

pops

该类订阅者,期望的数据即在订阅事件的返回应答中,应答中只是 key 和事件类型。如果不是 del 的话,需要根据具体的事件进行数据库读取。

shared_ptr<RedisReply> SubscriberStateTable::popEventBuffer()
{if (m_keyspace_event_buffer.empty())
    {return NULL;}

    auto reply = m_keyspace_event_buffer.front();
    m_keyspace_event_buffer.pop_front();

    return reply;
}

void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)
{vkco.clear();

    if (!m_buffer.empty())// 不为空,则将其中的内容拷贝出来
    {vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end());
        m_buffer.clear();
        return;
    }

    while (auto event = popEventBuffer())// 提取信息
    {
        KeyOpFieldsValuesTuple kco;
        /* if the Key-space notification is empty, try next one. */
        if (event->getContext()->type == REDIS_REPLY_NIL)
        {continue;}

        assert(event->getContext()->type == REDIS_REPLY_ARRAY);
        size_t n = event->getContext()->elements;

        /* Expecting 4 elements for each keyspace pmessage notification */
        // 键空间的应答一般包含四个消息
        if (n != 4)
        {SWSS_LOG_ERROR("invalid number of elements %lu for pmessage of %s", n, m_keyspace.c_str());
            continue;
        }
        /* The second element should be the original pattern matched */
        /* 第二个是命中的模式 */
        auto ctx = event->getContext()->element[1];
        if (m_keyspace != ctx->str)
        {SWSS_LOG_ERROR("invalid pattern %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
            continue;
        }
        // 第三个包含命中的 key,冒号后面就是 key
        ctx = event->getContext()->element[2];
        string msg(ctx->str);
        size_t pos = msg.find(':');
        if (pos == msg.npos)
        {SWSS_LOG_ERROR("invalid format %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
            continue;
        }
        // 冒号后面就是 key,比如 "VLAN_INTERFACE|Vlan1000|192.168.0.1/27"
        string table_entry = msg.substr(pos + 1);
        // 获取分割符号, 分隔符前面是表名
        pos = table_entry.find(m_table.getTableNameSeparator());
        if (pos == table_entry.npos)
        {SWSS_LOG_ERROR("invalid key %s returned for pmessage of %s", ctx->str, m_keyspace.c_str());
            continue;
        }
        string key = table_entry.substr(pos + 1);
        // 最后一个是操作
        ctx = event->getContext()->element[3];
        if (strcmp("del", ctx->str) == 0)
        {kfvKey(kco) = key;
            kfvOp(kco) = DEL_COMMAND;
        }
        else
        {   // 执行 get 操作  
            if (!m_table.get(key, kfvFieldsValues(kco)))
            {SWSS_LOG_ERROR("Failed to get content for table key %s", table_entry.c_str());
                continue;
            }
            kfvKey(kco) = key;
            kfvOp(kco) = SET_COMMAND;
        }

        vkco.push_back(kco);
    }

    m_keyspace_event_buffer.clear();

    return;
}

正文完
 0