乐趣区

关于etcd:etcd的MVCC是怎么实现的

MVCC 是什么

在理解之前,首先须要明确乐观锁与乐观锁的概念。乐观锁与乐观锁是两种编程思维,并不局限与编程语言。

乐观锁

在对临界资源做一些读写时候,为了避免其他人同步批改数据,间接把数据锁住,操作实现后才会开释锁,通过这种形式实现并发平安。常见的有 Go 的 Mutex,java 的 synchronized 等。

乐观锁

在对临界资源做操作时候,不锁住数据实现独占,而是判断数据有没有被别人批改过,如果批改了则返回批改失败。校验是否批改常见的形式有多版本并发管制(MVCC)等。

MVCC 简介

MVCC 即在对数据做批改操作时候,并不对原数据做批改,而是在数据根底上追加一个批改后的数据,并通过一个惟一的版本号做辨别,版本号个别通过自增的形式;在读取数据时候,读到的理论是以后版本号对应的一份快照数据。
比方一个键值对数据 K ->{V.0},此时 value 的版本号为 0。
操作 1 首先对数据做批改,读取到的 0 版本号的数据,对其做批改提交事务后便成为 K -> {V.0,V.1},
操作 2 之后读到的数据是版本号 1 的数据,对其做批改后提交事务胜利变为 K ->{V.0, V.1, V.2}。
每次批改只是往后面进行版本号以及数据值追加,通过这种形式使得每个事务操作到的是本人版本号内的数据,实现事务之间的隔离。也能够通过指定版本号拜访对应的数据。

etcd 的实现

etcd 就是基于 MVCC 机制实现的一个键值对数据库。接下来通过一个示例演示一下。

etcd 版本号

首先通过一个简略的 put,get 例子认识一下 etcd 的版本号。

# 首先 put 一个 key 为 linugo,value 为 go 的数据
[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo go
OK
#获取数据,能够看到 k 与 v 都是用了 base64 加密,能够看到 3 个 version
[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo -w=json|python -m json.tool
{
    "count": 1,
    "header": {
        "cluster_id": 14841639068965178418,
        "member_id": 10276657743932975437,
        "raft_term": 2,
        "revision": 2
    },
    "kvs": [
        {
            "create_revision": 2, #创立 key 时候对应的版本号
            "key": "bGludWdv",
            "mod_revision": 2, #批改时候的版本号
            "value": "Z28=",
            "version": 1 #批改次数(蕴含创立次数)}
    ]
}
#再次 put 两次
[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo gol
OK
[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo gola
OK
[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo -w=json|python -m json.tool
{
    "count": 1,
    "header": {
        "cluster_id": 14841639068965178418,
        "member_id": 10276657743932975437,
        "raft_term": 2,
        "revision": 4
    },
    "kvs": [
        {
            "create_revision": 2,# 创立的版本号仍然是 2
            "key": "bGludWdv",
            "mod_revision": 4,# 批改时候的版本号变为了 4
            "value": "Z29sYQ==",
            "version": 3 #批改次数为 3
        }
    ]
}

能够看到创立时候对应了一个版本号,每次批改后会生成新的版本号,是不是相似于下面所说版本号叠加呢。
接下来 put 一个与下面不同的键值对。

# 接下来 put 一个不同的 key
[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo1 go
OK
#获取详细信息
[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo1 -w=json|python -m json.tool
{
    "count": 1,
    "header": {
        "cluster_id": 14841639068965178418,
        "member_id": 10276657743932975437,
        "raft_term": 2,
        "revision": 5
    },
    "kvs": [
        {
            "create_revision": 5,# 创立的版本号成为了 5
            "key": "bGludWdvMQ==",
            "mod_revision": 5,# 批改时候版本号成为了 5
            "value": "Z28=",
            "version": 1 #批改次数为 1
        }
    ]
}

此时创立时候的版本号变成了 5,这是因为 etcd 外面存在一个全局的总版本号 revision,充当了逻辑时钟的概念,对每个 key 做一些批改删除操作都会触发主版本号自增。而每个 key 所作的就是在创立或者批改时候,记录下该 value 对应的主版本号。
实践上能够通过主版本号来找到任意数据的批改历史。如果双双记录的话,能够通过 key 查问到它对应的所有版本号,而后能够通过最新版本号找到对应的 value。实际上,etcd 也是这样做的。

MVCC 总览

etcd 保护了下面提到的两种映射关系,在内存中保护了一个 B -Tree 作为 key 与对应版本号的映射关系,这个构造叫做 treeIndex,又应用了 BoltDB 提供了版本号与对应 value 的映射关系以及数据的长久化存储。

当查问一个数据时候,首先通过 treeIndex 定位到最新的版本号 revision(如果客户端有指定版本号则查问指定的 revision),再通过 revision 定位到对应的 value。这个逻辑有点相似于 MySQL 中的一般索引查问。

keyIndex

在 treeIndex 中,key 是通过 keyIndex 构造对应多个 revision 的。

type keyIndex struct {key []byte
  modified revision // the main rev of the last modification
  generations []generation}
  • key:对应用户 put 的 key
  • modified:最初一次批改对应的 revision
  • generations:所有代对应的 revisions

    type revision struct {
    main int64
    sub int64
    }

    revision 也并不是一个单纯的数值类型,由两个字段组成

  • main:主版本号,也就对应 etcd 中的主版本号
  • sub:子事务版本号,在一个事务中可能蕴含多个子事务(比方一个 Txn 申请中蕴含多个 put 操作,主版本号 main 不会变,每个 put 操作会自增生成不一样的 sub)

    type generation struct {
    ver int64
    created revision // when the generation is created (put in first revision).
    revs []revision}

    在 etcd 中,对数据做删除操作并没有对数据做删除,而是在 generations 数组后追加了一个新的 generation 元素,然而如果想通过版本号获取曾经 del 的数据时也是获取不到的。在获取 key 最新 revision 时候,只须要找到 generations 数组最初一个 generation,并找到其中 revs 的最初一个 revision 元素。

  • ver:本 generation 中的 key 的批改次数,对应下面示例中 key 的批改次数
  • created:创立时候对应的全局版本号 revision
  • revs:key 各个版本对应的版本号

    Example

    上面通过一个示例来探索一下三者的转化关系。

  • 首先 put 一个新的 key,查看它的 keyIndex,初始版本号为 112,如下图左;
  • 对该 key 进行两次 put 操作,查看 keyindex,版本号自增 2,ver 批改次数变为 3,revs 中减少了两个元素,两头图;
  • 删除该 key,查看 keyindex,版本号自增,generation 切片减少了一位,原 revs 也减少了一位,对应删除操作。如下图右。

KeyValue

接下来看下 boltDB 中该 key 对应的数据并不是一个单纯的 value,同样也蕴含了很多其余的字段

type KeyValue struct {Key []byte // 对应用户传入的 key
  CreateRevision int64  // 创立时候的版本号
  ModRevision int64  // 最初一次批改的版本号
  Version int64  // 该 key 批改次数
  Value []byte // 传入的 value 值
  Lease int64  // 租约 ID
}

因为数据是通过 boltDB 做的磁盘长久化,所以在每次查问或者批改时候间接应用 boltDB 走磁盘读写会导致肯定的性能问题,所以在拜访 boltDB 之前有一个缓存 buffer,buffer 有两块,一块读 buffer(txReadBuffer),位于 baseReadTx,用于读事务;一块写 buffer(txWriteBuffer),位于 batchTxBuffered,用于写操作以及刷盘等。

type txWriteBuffer struct {
  txBuffer
  bucket2seq map[BucketID]bool
}

type txReadBuffer struct {
  txBuffer
  bufVersion uint64
}

读取时候首先从 buffer 中读取,没有命中则从 boltdb 中读取;
写的时候间接写到 buffer,在 End 事务提交时候,与读 buffer 进行 merge 操作,后盾协程会定时对写 buffer 落盘。

读数据源码调用关系

读申请在 etcd 中对立对应了 Range 办法,申请在下层通过了封装,拦截器校验,raft 数据同步等流程。

//etcdserver/apply.go
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
  
  ......
  if txn == nil {
    // 初始化一个读事务,对局部共享区域加读锁,获取以后最新版本号,
    txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
    // 事务完结, 提交事务,对加锁的局部解锁
        defer txn.End()}
  ......

  // 调用读事务实现的 Range 办法,通过 key 获取对应的 value(因为反对范畴查找,所以返回的 value 构造是切片)rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
  if err != nil {return nil, err}

  // 封装后果,返回客户端
  resp.Header.Revision = rr.Rev
  resp.Count = int64(rr.Count)
  resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
  for i := range rr.KVs {
    if r.KeysOnly {rr.KVs[i].Value = nil
    }
    resp.Kvs[i] = &rr.KVs[i]
  }
  return resp, nil
}

接下来看 Range 办法的具体实现

//mvcc/metrics_txn.go
func (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (*RangeResult, error) {
  tw.ranges++ // 用于申请指标的统计
  return tw.TxnWrite.Range(ctx, key, end, ro)
}
//mvcc/kvstore_txn.go
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}

func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
  // 测验版本号是否失常的操作
  ......
  // 获取 treeindex 中指定 key 的符合条件的版本号信息, 因为可能是范畴查找,版本号信息 revpairs 也属于切片类型
  revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
  ......
  kvs := make([]mvccpb.KeyValue, limit)
  revBytes := newRevBytes()
  // 对查出来的饿
  for i, revpair := range revpairs[:len(kvs)] {
     // 校验是否超时的操作
        ......
     // 通过版本号查找对应 value 的操作
    revToBytes(revpair, revBytes)
    _, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0)
    if len(vs) != 1 {......}
    // 将查找到的 value 进行反序列化
    if err := kvs[i].Unmarshal(vs[0]); err != nil {......}
  }
  return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
}

查找 revision

//mvcc/index.go
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
     // 对应查问单个 key
    if end == nil {
    // 查找 treeindex 中 key 对应的 keyIndex
    rev, _, _, err := ti.Get(key, atRev)
    ...
    return []revision{rev}, 1
  }
  // 对应范畴查找
  ti.visit(key, end, func(ki *keyIndex) bool {......})
  return revs, total
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {keyi := &keyIndex{key: key} // 查问进去的 keyindex
  ti.RLock()
  defer ti.RUnlock()
  // 查问进去 key 对应的 keyIndex 构造
  if keyi = ti.keyIndex(keyi); keyi == nil {return revision{}, revision{}, 0, ErrRevisionNotFound}
  // 从 keyindex 构造中提取进去须要的信息,即返回的信息
  return keyi.get(ti.lg, atRev)
}

func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
  // 找到无效的 generation,atRev 是指以后的 revision 或者客户端传入的指定 revision
  g := ki.findGeneration(atRev)
  //walk 找到无效的 revision,即比 atRev 版本号靠前而且间隔 atRev 最近的一个版本号
  n := g.walk(func(rev revision) bool {return rev.main > atRev})
  if n != -1 {
    // 返回最终批改时的 revision(即数组 revs 中最初的元素),创立时的 revision,批改次数等
    return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
  }

  return revision{}, revision{}, 0, ErrRevisionNotFound
}

查 value

通过下面的 Revisions 办法获取到了 Revision,接下来要通过 UnsafeRange 查找到 revision 对应的 value。该办法首先从读缓存中查找,查找不到去 boltdb 中查找。

//mvcc/backend/read_tx.go
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  // 对获取数量 limit 的测验
  ....
  // 首先从缓存 readbuffer 中读取,如果获取到间接返回。keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit) // 从 buffer 中读取,看看能读到不
  if int64(len(keys)) == limit {return keys, vals}
    ......
  // 去 boltDB 中获取
  k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
  return append(k2, keys...), append(v2, vals...)
}

写数据源码调用关系

写数据对立对应 put 办法,也会通过一系列的鉴权等前置步骤。

//etcdserver/apply.go
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
  ......
  if txn == nil {
    ......
        // 初始化一个写事务,对一些变量加锁等操作
    txn = a.s.KV().Write(trace)
    // 写事务提交
        defer txn.End()}
  ......
  // 调用写事务的 put 办法,返回是否 put 后的版本号
  resp.Header.Revision = txn.Put(p.Key, val, leaseID)
  return resp, trace, nil
}
//mvcc/metrics_txn.go
func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
  // 用于指标统计的一些数据
    ......
    // 调用 put 办法
  return tw.TxnWrite.Put(key, value, lease)
}

//mvcc/kvstore_txn.go
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
 //put 操作
  tw.put(key, value, lease)
  return tw.beginRev + 1
}

put 会生成新的版本号以及同步缓存的操作。

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
  rev := tw.beginRev + 1
  c := rev
  oldLease := lease.NoLease

  // 首先查看该 key 是否存在,如果存在,则返回创立时的版本号(用于封装存储的 value),批改次数等信息
  _, created, ver, err := tw.s.kvindex.Get(key, rev)
  if err == nil {
    c = created.main
    ......
  }
  ibytes := newRevBytes()
  // 生成一个 revision
  idxRev := revision{main: rev, sub: int64(len(tw.changes))}
  revToBytes(idxRev, ibytes)
  // 批改次数加一
  ver = ver + 1
  // 封装成要长久化存储的 value
  kv := mvccpb.KeyValue{
    Key: key,
    Value: value,
    CreateRevision: c,
    ModRevision: rev,
    Version: ver,
    Lease: int64(leaseID),
  }
  // 序列化 value
  d, err := kv.Marshal()
  // 将数据 (revision:value) 存入 buffer 和 boltDB 中,此时并未长久化
  tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
  // 在 treeIndex 中退出 (key:revision) 的对应关系,就是在对应 keyIndex 的 generations 中 revs 前面追加一个 revsion
  tw.s.kvindex.Put(key, idxRev)
  ......
}

//mvcc/backend/batch_tx.go
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
  // 在 boltdb 中退出该数据,但并未提交
  t.batchTx.UnsafeSeqPut(bucket, key, value)
  // 在缓存 buffer 中退出该数据
  t.buf.putSeq(bucket, key, value)
}

func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {t.unsafePut(bucket, key, value, true)
}

func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {if err := bucket.Put(key, value); err != nil {......}
     //pending 标记位自增,用于前面的合并缓存以及长久化数据
    t.pending++
}

缓存合并

在申请完结时候,会调用 txn 对应的 End()办法提交事务,

//mvcc/kvstore_txn.go
unc (tw *storeTxnWrite) End() {
  ......
  // 调用写事务的 Unlock
  tw.tx.Unlock()}
//mvcc/backend/batch_tx.go
func (t *batchTxBuffered) Unlock() {
  //pending 不等于 0 阐明有写操作
  if t.pending != 0 {
    // 读 buffer 加锁,在此期间读申请被阻塞
    t.backend.readTx.Lock()
    // 合并两个缓存
    t.buf.writeback(&t.backend.readTx.buf)
    t.backend.readTx.Unlock()
    // 如果超过限度则间接进行长久化操作
    if t.pending >= t.backend.batchLimit {t.commit(false)
    }
  }
  t.batchTx.Unlock()}
//mvcc/backend/tx_buffer.go
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
  // 遍历写 buffer 并对读 buffer 进行 merge
  for k, wb := range txw.buckets {rb, ok := txr.buckets[k]
    if !ok {delete(txw.buckets, k)
      txr.buckets[k] = wb
      continue
    }
    ......
    rb.merge(wb)
  }
    ......
}

数据长久化

下面在调用 put 的时候将数据存入的 buffer 与 boltDB 中,然而并未进行长久化操作。真早的长久化操作是在一个后盾 backend 协程中执行的,这个后盾协程会在 etcd 启动时开始工作,次要负责退出时候的资源长久化以及定期进行数据的磁盘长久化。

//mvcc/backend/backend.go
func (b *backend) run() {defer close(b.donec)
  t := time.NewTimer(b.batchInterval)
  defer t.Stop()
  for {
    select {
    case <-t.C:
    case <-b.stopc:// 收到退出信号,则进行磁盘长久化后退出协程
      b.batchTx.CommitAndStop()
      return
    }
        // 获取 pending 数据,如果不为 0,则进行 commit
    if b.batchTx.safePending() != 0 {b.batchTx.Commit()
    }
    t.Reset(b.batchInterval)
  }
}
//mvcc/backend/batch_tx.go
func (t *batchTxBuffered) Commit() {t.Lock()
  t.commit(false)
  t.Unlock()}

数据删除

数据删除时候,数据并没有真正的删除掉,只是在 keyIndex 中的 generations 数组中减少了一个新的 generation 元素。删除操作对应到 etcd 中的 DeleteRange 办法,删除满足条件的数据。

//mvcc/kvstore_txn.go
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
  rrev := tw.beginRev
  if len(tw.changes) > 0 {rrev++}
  // 寻找符合条件的 key
  keys, _ := tw.s.kvindex.Range(key, end, rrev)
  if len(keys) == 0 {return 0}
  for _, key := range keys {
    // 删除操作
    tw.delete(key)
  }
  return int64(len(keys))
}

func (tw *storeTxnWrite) delete(key []byte) {ibytes := newRevBytes()
  // 生成一个 revision
  idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
  revToBytes(idxRev, ibytes)
  // 打一个标记
  ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
  kv := mvccpb.KeyValue{Key: key}
  d, err := kv.Marshal()
  ......
  // 将数据删除的记录写到 boltDB 以及 buffer
  tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
  // 调用 Tombstone
  err = tw.s.kvindex.Tombstone(key, idxRev)
  ......
}

func (ti *treeIndex) Tombstone(key []byte, rev revision) error {keyi := &keyIndex{key: key}
  ti.Lock()
  defer ti.Unlock()
  // 获取 keyindex
  item := ti.tree.Get(keyi)
  if item == nil {return ErrRevisionNotFound}
  ki := item.(*keyIndex)
  return ki.tombstone(ti.lg, rev.main, rev.sub)
}

func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
  ......
  // 将删除操作的对应版本号写入到 keyindex
  ki.put(lg, main, sub)
  // 追加空的 generation
  ki.generations = append(ki.generations, generation{})
  keysGauge.Dec()
  return nil
}

小结

本文开始剖析了 etcd 实现 MVCC 的原理,之后从源码角度追溯 mvcc 的具体实现,仅对大体流程以及总体思路的源码进行了追溯,对于一些具体的实现并没有深刻探索上来(如 boltDB 存储,buffer 存储,etcd 事务等),一些想要理解更深刻的同学还须要本人更细化的读一下代码。理解 etcd MVCC 最重要的是弄懂 keyIndex,revision,generation 这三个数据结构。

reference

  • etcd-v3.5.0 源码:https://github.com/etcd-io/et…
  • 《etcd 原理与实际》:etcd 如何实现 MVCC
退出移动版