关于缓存:Datenlord-Etcd-客户端缓存实践

1次阅读

共计 6600 个字符,预计需要花费 17 分钟才能阅读完成。

简介和背景

Etcd 是一个反对强一致性的分布式 KV 存储,次要用于 metadata 的治理、服务发现、分布式锁等场景。DatenLord 应用 etcd 来治理集群的 metadata,DatenLord 的利用会频繁查问 etcd 中的 metadata,然而极少更改,是典型的读多写少的场景。在应用过程中咱们发现一次 etcd 操作带来的网络开销成为了性能瓶颈,所以咱们想到通过实现客户端缓存的形式来省去不必要的网络的开销。具体想法是在客户端缓存查问后果,这样每次须要查问 etcd 的时候,如果缓存命中,则只须要拜访一次本地内存省掉一次网络开销,如果没有命中,再到 etcd 中查问,并将查问后果退出缓存,这极大地升高了频繁查问 etcd 所带来的开销和提早。接下来会具体介绍咱们的实现和遇到的问题以及解决方案。

Etcd Watch 机制

在启用缓存之后就会面临本地缓存更新的问题,因为 etcd 是一个分布式的 KV 存储,容许多个客户端并发操作并保障一致性,那么如果其余客户端更新了 etcd 中的数据,那么如何更新本客户端本地缓存中的数据呢?一个计划是定时轮询 etcd 来获取数据更新,然而这个计划有显著弊病,如轮询的距离设定,如果设置过大将无奈及时更新本地缓存,导致长时间拿到旧的数据,如果设置很短,随着本地缓存数据量的减少,逐个轮询会同时减少 etcd 和客户端的累赘。

侥幸的是 etcd 提供了 watch 机制能够克服轮询计划的弊病。etcd 的 watch 机制容许客户端通知 etcd 它要关注的 key,如果该 key 有任何批改,etcd 会告诉客户端。具体实现形式是在客户端和服务器端建设一个长连贯,这个长连贯提供了两个性能,1. 客户端会通过这个长连贯来通知 etcd 它要 watch 的 key,以及不再想 watch 某些 key,即对应 WatchCreateRequest 和 WatchCancelRequest。watch 申请能够针对单个 key 或者 prefix。2. watch 申请胜利后,只有该 key 或者满足 prefix 的 key 有更新,etcd 就会通过这个长连贯以 WatchResponse 的模式告诉到客户端。须要留神的是所有的 watch 申请都复用这一条长连贯。咱们来看一下 WatchCreateRequest,WatchCancelRequest,WatchResponse,Event 的 protobuf 定义

message WatchCreateRequest {
  bytes key = 1;
  bytes range_end = 2;
  int64 start_revision = 3;
  bool progress_notify = 4;

  enum FilterType {
    NOPUT = 0;
    NODELETE = 1;
  }
  repeated FilterType filters = 5;
  bool prev_kv = 6;
}

message WatchCancelRequest {int64 watch_id = 1;}

message WatchResponse {
  ResponseHeader header = 1;
  int64 watch_id = 2;
  bool created = 3;
  bool canceled = 4;
  int64 compact_revision = 5;

  repeated mvccpb.Event events = 11;
}

message Event {
  enum EventType {
    PUT = 0;
    DELETE = 1;
  }
  EventType type = 1;
  KeyValue kv = 2;
  KeyValue prev_kv = 3;
}

具体每个 field 的意义请参考官网文档(etcd3 API | etcd),就不一一解释,这里只介绍咱们关怀的 field.

首先每一个 WatchCreateRequest 会收到一个 WatchResponse,之后的更新也会以 WatchResponse 的模式从 etcd 发送到客户端,WatchResponse 外面蕴含一个 watch_id,这个 id 代表了该回复对应的 watch,因为所有的 watch 申请都会复用同一个长链接,所以客户端须要本人保护 watched key/prefix 到 watch_id 的 mapping,以便晓得这个 WatchResponse 对应的是那个 key 或者 prefix,并且勾销 watch 发送的 WatchCancelRequest 也是用 watch_id 作为惟一标识符。

另外一个咱们关系的 filed 是 WatchCreateRequest 里的 start_revision 示意客户端心愿从 key 的哪一个 revision 开始 watch,etcd 会把从该 revision 开始之后所有的更新都发送给客户端。对于 revision 前面会有具体介绍这里先跳过。

Etcd watch 机制保障

Etcd watch 机制做出了三条保障,基于这三条保障用户才能够建设一套牢靠的利用

● 程序性:etcd 会依照 revision 的程序来发送音讯,revision 代表了事件产生的先后顺序
● 可靠性:间断的事件不会失落,如果收到了产生更晚的事件,那么早于这个事件的事件肯定曾经被收到
● 原子性:在同一个 revision 产生的事件不会分成多个音讯来发送

Etcd Revision

Etcd 实现了 MVCC,每当存储的数据产生扭转,etcd 就会把全局的 revision 加一来示意产生了一个新的版本,并会保留每一个版本的数据。因为 revision 是全局枯燥递增的,所以 revision 代表了批改产生的程序,revision 大的数据肯定比 revision 小的数据更新。因为 etcd 保留了每一个 revision 的数据,所以 etcd 反对历史 revision 的查问,后面提到的从 key 的某一个 revision 开始 watch 就是这个原理。etcd 的 KeyValue 定义如下:

message KeyValue {
  bytes key = 1;
  int64 create_revision = 2;
  int64 mod_revision = 3;
  int64 version = 4;
  bytes value = 5;
  int64 lease = 6;
}

一个 KeyValue 关联了三个版本号,

● create_revision: 该 key 被创立时的 revision
● mod_revision:该 key 最初一次被批改时候的 revision
● version:该 key 在最近一次被创立后经验了多少个版本,每一次批改 version 会加一

咱们次要应用了 mod_revision,咱们能够通过比拟同一个 key 的 mod_revision 来晓得哪一个值更新。

客户端缓存的实现

有了 etcd 的 watch 机制和 revision 咱们就能够实现一个客户端的缓存。客户端会保护一个从 key 到 KeyValue 的 hashmap,并告诉 etcd 咱们须要 watch 这些 key,从而收到数据的更新。

缓存

缓存是一个无锁的 hashmap 用于存 key 到 CacheEntry 的映射和一个优先队列用于淘汰缓存。CacheEntry 蕴含 revision 和 Option<KeyValue> 两个 field,revision 的目标是在多线程并发更新缓存的时候,应用 revision 来保障缓存中的数据是目前最新的,因为 etcd 的 revision 是全局惟一枯燥递增的,所以对同一个 key 来说,revision 更大的值更新。在更新缓存的时候,会查看本人的 mod_revision 是否比缓存中的 revision 高,如果高则会更新,否则不会更新。应用 Option<KeyValue> 而不是间接用 KeyValue 的目标在前面 Get 申请局部会有介绍。

/// Cache entry
#[derive(Debug, Clone, PartialEq)]
pub struct CacheEntry {
    /// current revision of key in cache
    revision: i64,
    /// key value, None means key has been deleted
    /// but watch is not cancelled yet.
    kv: Option<KeyValue>,
}
/// Cache struct contains a lock-free hashTable.
pub struct Cache {
    /// map to store key value
    hashtable: LockFreeCuckooHash<Vec<u8>, CacheEntry>,
    /// lru queue of the keys in hashtable.
    lru_queue: Mutex<PriorityQueue<Vec<u8>, u64>>,
}

Get 申请

当客户端执行 Get 申请,它会先查找本地缓存,如果本地缓存命中则间接返回给用户。如果没有命中则会把申请给 etcd,当拿到 etcd 的返回后会做两件事件

1. 把拿到的值插入到缓存中,这里就用到了 CacheEntry 中的 revision,如果两个线程同时来缓存查问同一个 key,发现缓存中没有命中,同时将申请发到 etcd,拿到了两个不同的值,那么这两个不同的值会对应不同的 mod_revision,有了 revision 的保障,只有更新的值才会插入到缓存中。

2. 向 etcd 发动对该 key 的 watch 申请,只有当 key 第一次插入缓存的时候才会发动 watch 申请,多个线程同时插入缓存只有一个线程会胜利插入,其余线程要么变成更新缓存要么因为 revision 不够高而放弃, 这样就防止的多个线程同时插入缓存产生反复 watch 的状况。在发送 watch 申请的时候会以 key 以后的 mod_revision 作为 WatchCreateRequest 的 start_revision,这样保障了不会失落在建设 watch 申请过程中该 key 在 etcd 中的批改。

因为咱们会在 key 被第一次加到缓存中的时候建设对该 key 的 watch,所以为了避免频繁建设勾销 watch,在 key 被删掉的时候,咱们只是把缓存中该 key 对应的 CacheEntry 中的 kv 置成 None 来示意 key 曾经被删掉了,而没有发送 WatchCancelRequest 来勾销 watch,这样当 key 被从新在 etcd 中创立的时候,就会收到 watch 更新,不须要从新建设 watch。只有当缓存中的 key 的数量超过阈值的时候,会触发 LRU 回收,发送勾销 watch 申请,在收到 etcd 的回复时,再将 key 从缓存中删除。Get 申请的代码如下

pub async fn get(&self, req: EtcdGetRequest) -> Res<EtcdGetResponse> {
    // cache is enabled
    if let Some(ref kvcache) = self.kvcache {if let Some(value) = kvcache.load().cache.search(req.get_key()).await {let mut response = RangeResponse::new();
            response.set_count(1);
            response.set_kvs(RepeatedField::from_vec(vec![value]));
            return Ok(EtcdGetResponse::new(response));
        }
    }

    let resp = retryable!(|| async {let resp = self.client.range_async(&req.clone().into())?;
        Ok(resp.await?)
    });

    if let Some(ref kvcache_arc) = self.kvcache {let kvs = resp.get_kvs();
        let kvcache = kvcache_arc.load();
        for kv in kvs {let (succeed, is_insert) = kvcache
                .cache
                .insert_or_update(kv.get_key().to_vec(), kv.clone())
                .await;
            if succeed && is_insert {
                // Creates a new watch request and adds to the send queue.
                let watch_request =
                    LocalWatchRequest::create(kv.get_key().to_vec(), kv.get_mod_revision());
                if let Err(e) = kvcache.watch_sender.send(watch_request).await {
                    warn!("Fail to send watch request, the error is {}, restart cache",
                        e
                    );
                    self.restart_kvcache();
                    return Err(e.into());
                }
                // Adjust cache size
                ......
            }
        }
    }
    Ok(From::from(resp))
}

Put 申请和 Delete 申请

Put 申请和 Delete 申请都是批改操作,解决形式基本相同。首先申请会间接发送到 etcd,如果该 key 没有在缓存中,那么间接将申请的回复返回给用户。如果该 key 曾经在缓存中了,那么要等到缓存中值更新到以后的值或者更新的值得时候再返回给用户(通过 revision 来保障),因为 Put 申请是一次批改操做,并且咱们曾经 watch 过该 key 了,那么 etcd 肯定会通过 watch 来告诉本次更新,甚至更新的批改。这样做的目标是如果不期待缓存更新就间接返回给用户,那么该用户在拿到回复后立刻做一次 Get 申请,那么就有几率读到旧的值,好像刚刚的 Put 申请没有失效,这个是不合乎用户认知的,并会毁坏一致性。Put 申请的代码如下:

pub async fn put(&self, req: EtcdPutRequest) -> Res<EtcdPutResponse> {
    let resp: EtcdPutResponse = retryable!(|| async {let resp = self.client.put_async(&req.clone().into())?;
        Ok(From::from(resp.await?))
    });
    // Wait until cache is updated and then return
    if let Some(ref kvcache) = self.kvcache {while let Some(kv) = kvcache.load().cache.search(req.get_key()).await {if kv.get_mod_revision() >= resp.get_revision() {break;}
            Timer::after(Duration::from_millis(1)).await;
        }
    }
    Ok(resp)
}

客户端缓存的局限性

应用客户端缓存后,极大地减速了拜访 etcd 的速度,然而这种实现有它的局限性,次要体现在两点:

1. 无奈反对 prefix 缓存:因为缓存和 etcd 都是以 key-value 的模式来存储数据,一个 prefix 可能会对应任意多个 key,如果想要在缓存中查找 prefix 就须要有和 etcd 一样的全量数据,这个对于缓存是不事实的。所以咱们的实现放弃了对 prefix 缓存的反对,所有的针对 prefix 的申请都会间接发到 etcd。

2. 某些场景下会进化成最终一致性:咱们在 Put 申请中做了肯定的优化来保障一致性,然而还是不能保障所有场景的强一致性。咱们来假如这样一个场景,本地主机上有两个过程,别离起了两个客户端,也就对应了两份缓存。线程 1 把 key A 的值从 1 批改到 2,而后通过某种过程间通信机制通知线程 2 key A 的值曾经改成 2 了,这个时候线程 2 去 etcd 查问 key A 的值,如果之前 key A 被缓存过,在 etcd 的 watch 告诉达到之前,缓存里的值还是 1,这个时候线程 2 就读到了旧的值,毁坏了 etcd 保障的强一致性,然而最终线程 2 的缓存会收到 etcd 的 watch 告诉从而更新 key A 的值为 2,这合乎最终一致性的要求。

总结

本文介绍了如何基于 etcd 的 watch 机制来实现客户端缓存,有了客户端缓存,极大地升高了拜访 etcd 的提早并进步了吞吐,非常适合读多写少的场景。在取得性能晋升的同时,客户端缓存也会带来一致性的问题,请大家依据本人的利用场景来取舍。

具体代码请参考 datenlord/etcd-client,欢送大家来探讨。

作者 | 潘政

正文完
 0