MVCC是什么

在理解之前,首先须要明确乐观锁与乐观锁的概念。乐观锁与乐观锁是两种编程思维,并不局限与编程语言。

乐观锁

在对临界资源做一些读写时候,为了避免其他人同步批改数据,间接把数据锁住,操作实现后才会开释锁,通过这种形式实现并发平安。常见的有Go的Mutex,java的synchronized等。

乐观锁

在对临界资源做操作时候,不锁住数据实现独占,而是判断数据有没有被别人批改过,如果批改了则返回批改失败。校验是否批改常见的形式有多版本并发管制(MVCC)等。

MVCC简介

MVCC即在对数据做批改操作时候,并不对原数据做批改,而是在数据根底上追加一个批改后的数据,并通过一个惟一的版本号做辨别,版本号个别通过自增的形式;在读取数据时候,读到的理论是以后版本号对应的一份快照数据。
比方一个键值对数据K->{V.0},此时value的版本号为0。
操作1首先对数据做批改,读取到的0版本号的数据,对其做批改提交事务后便成为K-> {V.0,V.1},
操作2之后读到的数据是版本号1的数据,对其做批改后提交事务胜利变为K->{V.0, V.1, V.2}。
每次批改只是往后面进行版本号以及数据值追加,通过这种形式使得每个事务操作到的是本人版本号内的数据,实现事务之间的隔离。也能够通过指定版本号拜访对应的数据。

etcd的实现

etcd就是基于MVCC机制实现的一个键值对数据库。接下来通过一个示例演示一下。

etcd版本号

首先通过一个简略的put,get例子认识一下etcd的版本号。

#首先put一个key为linugo,value为go的数据[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo goOK#获取数据,能够看到k与v都是用了base64加密,能够看到3个version[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo -w=json|python -m json.tool{    "count": 1,    "header": {        "cluster_id": 14841639068965178418,        "member_id": 10276657743932975437,        "raft_term": 2,        "revision": 2    },    "kvs": [        {            "create_revision": 2, #创立key时候对应的版本号            "key": "bGludWdv",            "mod_revision": 2, #批改时候的版本号            "value": "Z28=",            "version": 1 #批改次数(蕴含创立次数)        }    ]}#再次put两次[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo golOK[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo golaOK[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo -w=json|python -m json.tool{    "count": 1,    "header": {        "cluster_id": 14841639068965178418,        "member_id": 10276657743932975437,        "raft_term": 2,        "revision": 4    },    "kvs": [        {            "create_revision": 2,#创立的版本号仍然是2            "key": "bGludWdv",            "mod_revision": 4,#批改时候的版本号变为了4            "value": "Z29sYQ==",            "version": 3 #批改次数为3        }    ]}

能够看到创立时候对应了一个版本号,每次批改后会生成新的版本号,是不是相似于下面所说版本号叠加呢。
接下来put一个与下面不同的键值对。

#接下来put一个不同的key[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo1 goOK#获取详细信息[XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo1 -w=json|python -m json.tool{    "count": 1,    "header": {        "cluster_id": 14841639068965178418,        "member_id": 10276657743932975437,        "raft_term": 2,        "revision": 5    },    "kvs": [        {            "create_revision": 5,#创立的版本号成为了5            "key": "bGludWdvMQ==",            "mod_revision": 5,#批改时候版本号成为了5            "value": "Z28=",            "version": 1 #批改次数为1        }    ]}

此时创立时候的版本号变成了5,这是因为etcd外面存在一个全局的总版本号revision,充当了逻辑时钟的概念,对每个key做一些批改删除操作都会触发主版本号自增。而每个key所作的就是在创立或者批改时候,记录下该value对应的主版本号。
实践上能够通过主版本号来找到任意数据的批改历史。如果双双记录的话,能够通过key查问到它对应的所有版本号,而后能够通过最新版本号找到对应的value。实际上,etcd也是这样做的。

MVCC总览

etcd保护了下面提到的两种映射关系,在内存中保护了一个B-Tree作为key与对应版本号的映射关系,这个构造叫做treeIndex,又应用了BoltDB提供了版本号与对应value的映射关系以及数据的长久化存储。

当查问一个数据时候,首先通过treeIndex定位到最新的版本号revision(如果客户端有指定版本号则查问指定的revision),再通过revision定位到对应的value。这个逻辑有点相似于MySQL中的一般索引查问。

keyIndex

在treeIndex中,key是通过keyIndex构造对应多个revision的。

type keyIndex struct {  key []byte  modified revision // the main rev of the last modification  generations []generation}
  • key:对应用户put的key
  • modified:最初一次批改对应的revision
  • generations:所有代对应的revisions

    type revision struct {main int64sub int64}

    revision也并不是一个单纯的数值类型,由两个字段组成

  • main:主版本号,也就对应etcd中的主版本号
  • sub:子事务版本号,在一个事务中可能蕴含多个子事务(比方一个Txn申请中蕴含多个put操作,主版本号main不会变,每个put操作会自增生成不一样的sub)

    type generation struct {ver int64created revision // when the generation is created (put in first revision).revs []revision}

    在etcd中,对数据做删除操作并没有对数据做删除,而是在generations数组后追加了一个新的generation元素,然而如果想通过版本号获取曾经del的数据时也是获取不到的。在获取key最新revision时候,只须要找到generations数组最初一个generation,并找到其中revs的最初一个revision元素。

  • ver:本generation中的key的批改次数,对应下面示例中key的批改次数
  • created:创立时候对应的全局版本号revision
  • revs:key各个版本对应的版本号

    Example

    上面通过一个示例来探索一下三者的转化关系。

  • 首先put一个新的key,查看它的keyIndex,初始版本号为112,如下图左;
  • 对该key进行两次put操作,查看keyindex,版本号自增2,ver批改次数变为3,revs中减少了两个元素,两头图;
  • 删除该key,查看keyindex,版本号自增,generation切片减少了一位,原revs也减少了一位,对应删除操作。如下图右。

KeyValue

接下来看下boltDB中该key对应的数据并不是一个单纯的value,同样也蕴含了很多其余的字段

type KeyValue struct {  Key []byte //对应用户传入的key  CreateRevision int64  //创立时候的版本号  ModRevision int64  //最初一次批改的版本号  Version int64  //该key批改次数  Value []byte //传入的value值  Lease int64  //租约ID}

因为数据是通过boltDB做的磁盘长久化,所以在每次查问或者批改时候间接应用boltDB走磁盘读写会导致肯定的性能问题,所以在拜访boltDB之前有一个缓存buffer,buffer有两块,一块读buffer(txReadBuffer),位于baseReadTx,用于读事务;一块写buffer(txWriteBuffer),位于batchTxBuffered,用于写操作以及刷盘等。

type txWriteBuffer struct {  txBuffer  bucket2seq map[BucketID]bool}type txReadBuffer struct {  txBuffer  bufVersion uint64}

读取时候首先从buffer中读取,没有命中则从boltdb中读取;
写的时候间接写到buffer,在End事务提交时候,与读buffer进行merge操作,后盾协程会定时对写buffer落盘。

读数据源码调用关系

读申请在etcd中对立对应了Range办法,申请在下层通过了封装,拦截器校验,raft数据同步等流程。

//etcdserver/apply.gofunc (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {    ......  if txn == nil {    //初始化一个读事务,对局部共享区域加读锁,获取以后最新版本号,    txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)    //事务完结,提交事务,对加锁的局部解锁        defer txn.End()  }  ......  //调用读事务实现的Range办法,通过key获取对应的value(因为反对范畴查找,所以返回的value构造是切片)  rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)  if err != nil {    return nil, err  }  //封装后果,返回客户端  resp.Header.Revision = rr.Rev  resp.Count = int64(rr.Count)  resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))  for i := range rr.KVs {    if r.KeysOnly {      rr.KVs[i].Value = nil    }    resp.Kvs[i] = &rr.KVs[i]  }  return resp, nil}

接下来看Range办法的具体实现

//mvcc/metrics_txn.gofunc (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (*RangeResult, error) {  tw.ranges++ //用于申请指标的统计  return tw.TxnWrite.Range(ctx, key, end, ro)}//mvcc/kvstore_txn.gofunc (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {  return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)}func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {  //测验版本号是否失常的操作  ......  //获取treeindex中指定key的符合条件的版本号信息,因为可能是范畴查找,版本号信息revpairs也属于切片类型  revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))  ......  kvs := make([]mvccpb.KeyValue, limit)  revBytes := newRevBytes()  //对查出来的饿  for i, revpair := range revpairs[:len(kvs)] {     //校验是否超时的操作        ......     //通过版本号查找对应value的操作    revToBytes(revpair, revBytes)    _, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0)    if len(vs) != 1 {      ......    }    //将查找到的value进行反序列化    if err := kvs[i].Unmarshal(vs[0]); err != nil {      ......    }  }  return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil}

查找revision

//mvcc/index.gofunc (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {     //对应查问单个key    if end == nil {    //查找treeindex中key对应的keyIndex    rev, _, _, err := ti.Get(key, atRev)    ...    return []revision{rev}, 1  }  //对应范畴查找  ti.visit(key, end, func(ki *keyIndex) bool {    ......  })  return revs, total}func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {  keyi := &keyIndex{key: key} //查问进去的keyindex  ti.RLock()  defer ti.RUnlock()  //查问进去key对应的keyIndex构造  if keyi = ti.keyIndex(keyi); keyi == nil {    return revision{}, revision{}, 0, ErrRevisionNotFound  }  //从keyindex构造中提取进去须要的信息,即返回的信息  return keyi.get(ti.lg, atRev)}func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {  //找到无效的generation,atRev是指以后的revision或者客户端传入的指定revision  g := ki.findGeneration(atRev)  //walk找到无效的revision,即比atRev版本号靠前而且间隔atRev最近的一个版本号  n := g.walk(func(rev revision) bool { return rev.main > atRev })  if n != -1 {    //返回最终批改时的revision(即数组revs中最初的元素),创立时的revision,批改次数等    return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil  }  return revision{}, revision{}, 0, ErrRevisionNotFound}

查value

通过下面的Revisions办法获取到了Revision,接下来要通过UnsafeRange查找到revision对应的value。该办法首先从读缓存中查找,查找不到去boltdb中查找。

//mvcc/backend/read_tx.gofunc (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {  //对获取数量limit的测验  ....  //首先从缓存readbuffer中读取,如果获取到间接返回。  keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit) //从buffer中读取,看看能读到不  if int64(len(keys)) == limit {    return keys, vals  }    ......  //去boltDB中获取  k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))  return append(k2, keys...), append(v2, vals...)}

写数据源码调用关系

写数据对立对应put办法,也会通过一系列的鉴权等前置步骤。

//etcdserver/apply.gofunc (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {  ......  if txn == nil {    ......        //初始化一个写事务,对一些变量加锁等操作    txn = a.s.KV().Write(trace)    //写事务提交        defer txn.End()  }  ......  //调用写事务的put办法,返回是否put后的版本号  resp.Header.Revision = txn.Put(p.Key, val, leaseID)  return resp, trace, nil}//mvcc/metrics_txn.gofunc (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {  //用于指标统计的一些数据    ......    //调用put办法  return tw.TxnWrite.Put(key, value, lease)}//mvcc/kvstore_txn.gofunc (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { //put操作  tw.put(key, value, lease)  return tw.beginRev + 1}

put会生成新的版本号以及同步缓存的操作。

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {  rev := tw.beginRev + 1  c := rev  oldLease := lease.NoLease  //首先查看该key是否存在,如果存在,则返回创立时的版本号(用于封装存储的value),批改次数等信息  _, created, ver, err := tw.s.kvindex.Get(key, rev)  if err == nil {    c = created.main    ......  }  ibytes := newRevBytes()  //生成一个revision  idxRev := revision{main: rev, sub: int64(len(tw.changes))}  revToBytes(idxRev, ibytes)  //批改次数加一  ver = ver + 1  //封装成要长久化存储的value  kv := mvccpb.KeyValue{    Key: key,    Value: value,    CreateRevision: c,    ModRevision: rev,    Version: ver,    Lease: int64(leaseID),  }  //序列化value  d, err := kv.Marshal()  //将数据(revision:value)存入buffer和boltDB中,此时并未长久化  tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)  //在treeIndex中退出(key:revision)的对应关系,就是在对应keyIndex的generations中revs前面追加一个revsion  tw.s.kvindex.Put(key, idxRev)  ......}//mvcc/backend/batch_tx.gofunc (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {  //在boltdb中退出该数据,但并未提交  t.batchTx.UnsafeSeqPut(bucket, key, value)  //在缓存buffer中退出该数据  t.buf.putSeq(bucket, key, value)}func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {  t.unsafePut(bucket, key, value, true)}func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {  if err := bucket.Put(key, value); err != nil {  ......  }     //pending标记位自增,用于前面的合并缓存以及长久化数据    t.pending++}

缓存合并

在申请完结时候,会调用txn对应的End()办法提交事务,

//mvcc/kvstore_txn.gounc (tw *storeTxnWrite) End() {  ......  //调用写事务的Unlock  tw.tx.Unlock()}//mvcc/backend/batch_tx.gofunc (t *batchTxBuffered) Unlock() {  //pending不等于0阐明有写操作  if t.pending != 0 {    //读buffer加锁,在此期间读申请被阻塞    t.backend.readTx.Lock()    //合并两个缓存    t.buf.writeback(&t.backend.readTx.buf)    t.backend.readTx.Unlock()    //如果超过限度则间接进行长久化操作    if t.pending >= t.backend.batchLimit {      t.commit(false)    }  }  t.batchTx.Unlock()}//mvcc/backend/tx_buffer.gofunc (txw *txWriteBuffer) writeback(txr *txReadBuffer) {  //遍历写buffer并对读buffer进行merge  for k, wb := range txw.buckets {    rb, ok := txr.buckets[k]    if !ok {      delete(txw.buckets, k)      txr.buckets[k] = wb      continue    }    ......    rb.merge(wb)  }    ......}

数据长久化

下面在调用put的时候将数据存入的buffer与boltDB中,然而并未进行长久化操作。真早的长久化操作是在一个后盾backend协程中执行的,这个后盾协程会在etcd启动时开始工作,次要负责退出时候的资源长久化以及定期进行数据的磁盘长久化。

//mvcc/backend/backend.gofunc (b *backend) run() {  defer close(b.donec)  t := time.NewTimer(b.batchInterval)  defer t.Stop()  for {    select {    case <-t.C:    case <-b.stopc://收到退出信号,则进行磁盘长久化后退出协程      b.batchTx.CommitAndStop()      return    }        //获取pending数据,如果不为0,则进行commit    if b.batchTx.safePending() != 0 {      b.batchTx.Commit()    }    t.Reset(b.batchInterval)  }}//mvcc/backend/batch_tx.gofunc (t *batchTxBuffered) Commit() {  t.Lock()  t.commit(false)  t.Unlock()}

数据删除

数据删除时候,数据并没有真正的删除掉,只是在keyIndex中的generations数组中减少了一个新的generation元素。删除操作对应到etcd中的DeleteRange办法,删除满足条件的数据。

//mvcc/kvstore_txn.gofunc (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {  rrev := tw.beginRev  if len(tw.changes) > 0 {    rrev++  }  //寻找符合条件的key  keys, _ := tw.s.kvindex.Range(key, end, rrev)  if len(keys) == 0 {    return 0  }  for _, key := range keys {    //删除操作    tw.delete(key)  }  return int64(len(keys))}func (tw *storeTxnWrite) delete(key []byte) {  ibytes := newRevBytes()  //生成一个revision  idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}  revToBytes(idxRev, ibytes)  //打一个标记  ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)  kv := mvccpb.KeyValue{Key: key}  d, err := kv.Marshal()  ......  //将数据删除的记录写到boltDB以及buffer  tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)  //调用Tombstone  err = tw.s.kvindex.Tombstone(key, idxRev)  ......}func (ti *treeIndex) Tombstone(key []byte, rev revision) error {  keyi := &keyIndex{key: key}  ti.Lock()  defer ti.Unlock()  //获取keyindex  item := ti.tree.Get(keyi)  if item == nil {    return ErrRevisionNotFound  }  ki := item.(*keyIndex)  return ki.tombstone(ti.lg, rev.main, rev.sub)}func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {  ......  //将删除操作的对应版本号写入到keyindex  ki.put(lg, main, sub)  //追加空的generation  ki.generations = append(ki.generations, generation{})  keysGauge.Dec()  return nil}

小结

本文开始剖析了etcd实现MVCC的原理,之后从源码角度追溯mvcc的具体实现,仅对大体流程以及总体思路的源码进行了追溯,对于一些具体的实现并没有深刻探索上来(如boltDB存储,buffer存储,etcd事务等),一些想要理解更深刻的同学还须要本人更细化的读一下代码。理解etcd MVCC最重要的是弄懂keyIndex,revision,generation这三个数据结构。

reference

  • etcd-v3.5.0源码:https://github.com/etcd-io/et...
  • 《etcd原理与实际》:etcd如何实现MVCC