租约是什么

咱们都晓得Redis能够通过expire命令对key设置过期工夫,来实现缓存的ttl,etcd同样有一种个性能够对key设置过期工夫,也就是租约(Lease)。不过相较来说,两者的实用场景并不相同,etcd的Lease宽泛的用在服务注册与保活上,redis则次要用于淘汰缓存。上面介绍一下etcd的Lease机制,会从应用形式,以及实现原理来逐渐探索。

应用形式

首先通过一个案例简略介绍它的应用形式。

package mainimport (    "context"    "log"    "os"    "os/signal"    "syscall"    "time"    clientv3 "go.etcd.io/etcd/client/v3")func main() {    key := "linugo-lease"    cli, err := clientv3.New(clientv3.Config{        Endpoints:   []string{"127.0.0.1:23790"},        DialTimeout: time.Second,    })    if err != nil {        log.Fatal("new client err: ", err)    }    //首先创立一个Lease并通过Grant办法申请一个租约,设置ttl为20秒,没有续约的话,该租约会在20s后隐没    ls := clientv3.NewLease(cli)    grantResp, err := ls.Grant(context.TODO(), 20)    if err != nil {        log.Fatal("grant err: ", err)    }    log.Printf("grant id: %x\n", grantResp.ID)    //接下来插入一个键值对绑定该租约,该键值对会随着租约的到期而相应被删除    putResp, err := cli.Put(context.TODO(), key, "value", clientv3.WithLease(grantResp.ID))    if err != nil {        log.Fatal("put err: ", err)    }    log.Printf("create version: %v\n", putResp.Header.Revision)    //通过KeepAliveOnce办法对该租约进行续期,每隔5s会将该租约续期到初始的20s    go func() {        for {            time.Sleep(time.Second * 5)            resp, err := ls.KeepAliveOnce(context.TODO(), grantResp.ID)            if err != nil {                log.Println("keep alive once err: ", err)                break            }            log.Println("keep alive: ", resp.TTL)        }    }()    sigC := make(chan os.Signal, 1)    signal.Notify(sigC, os.Interrupt, syscall.SIGTERM)    s := <-sigC    log.Println("exit with: ", s.String())}

咱们能够通过上述形式实现某个服务模块的保活,能够将节点的地址注册到etcd中,并绑定适当时长的租约,定时进行续约操作,若节点宕机,超过了租约时长,etcd中该节点的信息就会被移除掉,实现服务的主动摘除,通常配合etcd的watch个性来做到实时的感知。
v3版的客户端接口除了上述的Grant,KeepAliveOnce办法,还包含了一些其余重要的办法如Revoke删除某个租约,TimeToLive查看某个租约残余时长等。
etcd服务端面向租约对客户端服务的有5个接口,别离对client端的办法给予了实现。本次次要对服务端的实现办法进行剖析。

type LeaseServer interface {    //对应客户端的Grant办法,创立租约    LeaseGrant(context.Context, *LeaseGrantRequest) (*LeaseGrantResponse, error)    //删除某个租约    LeaseRevoke(context.Context, *LeaseRevokeRequest) (*LeaseRevokeResponse, error)    //租约某个续期    LeaseKeepAlive(Lease_LeaseKeepAliveServer) error    //租约残余时长查问    LeaseTimeToLive(context.Context, *LeaseTimeToLiveRequest) (*LeaseTimeToLiveResponse, error)    //查看所有租约    LeaseLeases(context.Context, *LeaseLeasesRequest) (*LeaseLeasesResponse, error)}

初始化

在etcd启动时候,会初始化一个lessor,lessor外部存储了所有无关租约的信息,包含租约ID,到期工夫,租约绑定的键值对等;lessor实现了一系列接口,是租约性能的具体实现逻辑,包含Grant(创立),Revoke(撤销),Renew(续租)等。

type lessor struct {    mu sync.RWMutex    demotec chan struct{}    //寄存所有无效的lease信息,key为leaseID,value包含该租约的ID,ttl,lease绑定的key等信息    leaseMap             map[LeaseID]*Lease    //便于查找lease的一个数据结构,基于最小堆实现,能够将快到期的租约放到队头,查看是否过期时候,只须要查看队头即可    leaseExpiredNotifier *LeaseExpiredNotifier    //用于实时更新lease的剩余时间    leaseCheckpointHeap  LeaseQueue    //用户寄存的key与lease的绑定关系,通过key能够找到租约    itemMap              map[LeaseItem]LeaseID    ......    //过期的lease会被放到该chan中,被消费者清理    expiredC chan []*Lease    ......}

在lessor被初始化后,同时会启动一个goroutine,用于频繁的查看是否有过期的lease以及更新lease剩余时间。lease的这些查看是集群的leader节点做的,包含更新残余的工夫,保护lease的最小堆,到期时候撤销lease。而follower节点只用于响应leader节点的存储、更新或撤销lease申请。

func (le *lessor) runLoop() {    defer close(le.doneC)    for {        //查看是否有过期的lease        le.revokeExpiredLeases()        //checkpoint机制查看并更新lease的剩余时间        le.checkpointScheduledLeases()        //每500毫秒查看一次        select {        case <-time.After(500 * time.Millisecond):        case <-le.stopC:            return        }    }}

为了涵盖大部分场景,咱们假如一个三节点的etcd集群的场景,通过下面的案例代码对其中的一个follower节点发动申请。

创立

当v3客户端调用Grant办法时候,会对应到server端LeaseServer的LeaseGrant办法,该办法会通过一系列的两头步骤(鉴权等)达到etcdServer包装实现的LeaseGrant办法,该办法会调用raft模块并封装一个Lease的提案并进行数据同步流程。因为此时节点是follower,会将申请转交给leader进行解决,leader接到申请后会将该提案封装成一个日志,并播送到follower节点,follower节点执行提案音讯,并回复给leader节点。

在follower节点执行提案内容时候,会解析出该申请是一个创立lease的申请,该流程是在apply模块执行的。apply模块会调用本人包装好的LeaseGrant办法。

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {    op := "unknown"    ar := &applyResult{}    ......    // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls    switch {    ......    case r.LeaseGrant != nil:        op = "LeaseGrant"        ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)    ......    default:        a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))    }    return ar}

LeaseGrant办法是对lessor实现的Grant办法的进一步封装。

func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {    l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)    resp := &pb.LeaseGrantResponse{}    if err == nil {        resp.ID = int64(l.ID)        resp.TTL = l.TTL()        resp.Header = newHeader(a.s)    }    return resp, err}lessor通过Grant办法将lease封装并存入到本人的leaseMap,并经lease长久化到boltdb。func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {    ......    //封装lease    l := &Lease{        ID:      id,        ttl:     ttl,         //用于寄存该lease绑定的key,用于在lease过期时删除key        itemSet: make(map[LeaseItem]struct{}),        revokec: make(chan struct{}),    }    ......    //如果是leader节点,则刷新lease的到期工夫    if le.isPrimary() {        l.refresh(0)    } else {        //follower节点中没有存储lease的到期工夫        l.forever()    }    le.leaseMap[id] = l    //lease信息长久化    l.persistTo(le.b)    //如果是leader节点,就将lease信息放到最小堆中    if le.isPrimary() {        item := &LeaseWithTime{id: l.ID, time: l.expiry}        le.leaseExpiredNotifier.RegisterOrUpdate(item)        le.scheduleCheckpointIfNeeded(l)    }    return l, nil}

绑定

lease创立好之后,就能够通过Put指令创立一个数据并与lease进行绑定。在Put时候,put的value字段中会有一个leaseID,并存到了boltDB。这样能够在etcd挂掉之后,能够依据长久化存储来复原lease与数据的对应关系。

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {    ......    kv := mvccpb.KeyValue{        Key:            key,        Value:          value,        CreateRevision: c,        ModRevision:    rev,        Version:        ver,        //lease字段        Lease:          int64(leaseID),    }    //....长久化等操作    //attach操作    if leaseID != lease.NoLease {        if tw.s.le == nil {            panic("no lessor to attach lease")        }        err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})        if err != nil {            panic("unexpected error from lease Attach")        }    }    tw.trace.Step("attach lease to kv pair")}

lessor的Attach操作会将lease与key两者进行绑定并存到本身的itemMap以及lease的itemSet中。

func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {    ......    l := le.leaseMap[id]    l.mu.Lock()    for _, it := range items {        //存到lease的itemSet        l.itemSet[it] = struct{}{}        //存到lessor的itemMap中        le.itemMap[it] = id    }    l.mu.Unlock()    return nil}

保活

客户端提供的keepAlive办法用于lease进行续租,每次调用都会使得lease的剩余时间回到初始化时候设定的剩余时间。因为lease的一些查看以及保护都是由leader节点维持,所以当咱们发送申请到follower时,会间接将申请重定向到leader节点。

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {    //发送到follower会返回ErrNotPrimary的谬误    ttl, err := s.lessor.Renew(id)    if err == nil { // already requested to primary lessor(leader)        return ttl, nil    }    ......    for cctx.Err() == nil && err != nil {        //获取leader节点        leader, lerr := s.waitLeader(cctx)        if lerr != nil {            return -1, lerr        }        for _, url := range leader.PeerURLs {            lurl := url + leasehttp.LeasePrefix             //通过http接口申请到leader的keeplaive接口            ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)            if err == nil || err == lease.ErrLeaseNotFound {                return ttl, err            }        }        time.Sleep(50 * time.Millisecond)    }    ......    return -1, ErrCanceled}

达到Leader节点之后会通过Renew更新该lease的剩余时间,过期工夫以及最小堆中的lease。

func (le *lessor) Renew(id LeaseID) (int64, error) {    le.mu.RLock()    if !le.isPrimary() {        le.mu.RUnlock()        return -1, ErrNotPrimary    }    demotec := le.demotec    l := le.leaseMap[id]    if l == nil {        le.mu.RUnlock()        return -1, ErrLeaseNotFound    }    //当cp(checkpoint办法,须要通过raft做数据同步的办法)不为空而且剩余时间大于0时为true    clearRemainingTTL := le.cp != nil && l.remainingTTL > 0    le.mu.RUnlock()    //如果lease过期    if l.expired() {        select {        case <-l.revokec: //revoke时候会间接返回            return -1, ErrLeaseNotFound        // The expired lease might fail to be revoked if the primary changes.        // The caller will retry on ErrNotPrimary.        case <-demotec:            return -1, ErrNotPrimary        case <-le.stopC:            return -1, ErrNotPrimary        }    }    if clearRemainingTTL {        //通过checkpoint办法同步到各个节点lease的剩余时间        le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})    }    le.mu.Lock()    l.refresh(0)    item := &LeaseWithTime{id: l.ID, time: l.expiry}    //更新最小堆中的lease    le.leaseExpiredNotifier.RegisterOrUpdate(item)    le.mu.Unlock()    return l.ttl, nil}

撤销

撤销操作能够由两种形式触发,一种是通过客户端间接调用Revoke办法被动触发,一种是leader节点检测到lease过期时候的被动触发。被动触发绝对简略,follower节点收到申请后间接调用raft模块同步该申请,各个节点收到申请后通过lessor被动删除该lease(删除并没有间接删除leaseMap中的lease,而是敞开对应revokec),以及删除绑定在下面的key。

func (le *lessor) Revoke(id LeaseID) error {    le.mu.Lock()    l := le.leaseMap[id]    //敞开告诉的管道    defer close(l.revokec)    le.mu.Unlock()    if le.rd == nil {        return nil    }    txn := le.rd()    //Keys办法会将lease中itemSet的key取出    keys := l.Keys()    sort.StringSlice(keys).Sort()    //删除lease绑定的key    for _, key := range keys {        txn.DeleteRange([]byte(key), nil)    }    le.mu.Lock()    defer le.mu.Unlock()    delete(le.leaseMap, l.ID)    //删除boltdb长久化的lease    le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))    txn.End()    return nil}

被动触发则通过创立lessor时候启动的异步协程runLoop(),每500ms轮询调用revokeExpiredLeases来查看是否过期。

func (le *lessor) revokeExpiredLeases() {    var ls []*Lease    // rate limit    revokeLimit := leaseRevokeRate / 2    le.mu.RLock()    //如果是leader节点    if le.isPrimary() {        //在leaseExpiredNotifier最小堆中找到过期的lease        ls = le.findExpiredLeases(revokeLimit)    }    le.mu.RUnlock()    if len(ls) != 0 {        select {        case <-le.stopC:            return        case le.expiredC <- ls://将过期的lease发送到expireC中        default:        }    }}

在etcd启动时候,会另外启动一个异步run协程,会订阅该expireC,收到音讯后发动一个Revoke提案并进行同步操作。

//leassor通过ExpiredLeasesC办法把expiredC裸露进去func (le *lessor) ExpiredLeasesC() <-chan []*Lease {    return le.expiredC}//etcd启动的异步run协程func (s *EtcdServer) run() {    ......    var expiredLeaseC <-chan []*lease.Lease    if s.lessor != nil {        expiredLeaseC = s.lessor.ExpiredLeasesC()    }    for{        select{           case leases := <-expiredLeaseC://接到过期音讯            s.GoAttach(func() {                for _, lease := range leases {                    ......                    lid := lease.ID                    s.GoAttach(func() {                        ctx := s.authStore.WithRoot(s.ctx)                         //调用revoke办法                        _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})                        ......                        <-c                    })                }            })           ......            //其余case操作        }    }}

小结

为了保持数据的一致性,lease的创立,删除,checkpoint等都须要通过raft模块进行同步,而在续约阶段则间接通过http申请发送到leader节点,所有的保护与查看工作都在leader节点,大体能够用下图来示意。因为作者对raft模块了解不够深刻,所以一笔带过。

Reference

  • etcd-v3.5.0源码 - https://github.com/etcd-io/et...
  • etcd 如何实现租约 - 拉钩教育,etcd原理与实际