乐趣区

关于etcd:etcd的租约是怎么实现的

租约是什么

咱们都晓得 Redis 能够通过 expire 命令对 key 设置过期工夫,来实现缓存的 ttl,etcd 同样有一种个性能够对 key 设置过期工夫,也就是租约(Lease)。不过相较来说,两者的实用场景并不相同,etcd 的 Lease 宽泛的用在服务注册与保活上,redis 则次要用于淘汰缓存。上面介绍一下 etcd 的 Lease 机制,会从应用形式,以及实现原理来逐渐探索。

应用形式

首先通过一个案例简略介绍它的应用形式。

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
​
    clientv3 "go.etcd.io/etcd/client/v3"
)
​
func main() {
​
    key := "linugo-lease"
​
    cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"127.0.0.1:23790"},
        DialTimeout: time.Second,
    })
    if err != nil {log.Fatal("new client err:", err)
    }
​
    // 首先创立一个 Lease 并通过 Grant 办法申请一个租约,设置 ttl 为 20 秒,没有续约的话,该租约会在 20s 后隐没
    ls := clientv3.NewLease(cli)
    grantResp, err := ls.Grant(context.TODO(), 20)
    if err != nil {log.Fatal("grant err:", err)
    }
    log.Printf("grant id: %x\n", grantResp.ID)
​
    // 接下来插入一个键值对绑定该租约,该键值对会随着租约的到期而相应被删除
    putResp, err := cli.Put(context.TODO(), key, "value", clientv3.WithLease(grantResp.ID))
    if err != nil {log.Fatal("put err:", err)
    }
​
    log.Printf("create version: %v\n", putResp.Header.Revision)
    // 通过 KeepAliveOnce 办法对该租约进行续期,每隔 5s 会将该租约续期到初始的 20s
    go func() {
        for {time.Sleep(time.Second * 5)
            resp, err := ls.KeepAliveOnce(context.TODO(), grantResp.ID)
            if err != nil {log.Println("keep alive once err:", err)
                break
            }
            log.Println("keep alive:", resp.TTL)
        }
    }()
​
    sigC := make(chan os.Signal, 1)
    signal.Notify(sigC, os.Interrupt, syscall.SIGTERM)
    s := <-sigC
    log.Println("exit with:", s.String())
}

咱们能够通过上述形式实现某个服务模块的保活,能够将节点的地址注册到 etcd 中,并绑定适当时长的租约,定时进行续约操作,若节点宕机,超过了租约时长,etcd 中该节点的信息就会被移除掉,实现服务的主动摘除,通常配合 etcd 的 watch 个性来做到实时的感知。
v3 版的客户端接口除了上述的 Grant,KeepAliveOnce 办法,还包含了一些其余重要的办法如 Revoke 删除某个租约,TimeToLive 查看某个租约残余时长等。
etcd 服务端面向租约对客户端服务的有 5 个接口,别离对 client 端的办法给予了实现。本次次要对服务端的实现办法进行剖析。

type LeaseServer interface {
    // 对应客户端的 Grant 办法,创立租约
    LeaseGrant(context.Context, *LeaseGrantRequest) (*LeaseGrantResponse, error)
    // 删除某个租约
    LeaseRevoke(context.Context, *LeaseRevokeRequest) (*LeaseRevokeResponse, error)
    // 租约某个续期
    LeaseKeepAlive(Lease_LeaseKeepAliveServer) error
    // 租约残余时长查问
    LeaseTimeToLive(context.Context, *LeaseTimeToLiveRequest) (*LeaseTimeToLiveResponse, error)
    // 查看所有租约
    LeaseLeases(context.Context, *LeaseLeasesRequest) (*LeaseLeasesResponse, error)
}

初始化

在 etcd 启动时候,会初始化一个 lessor,lessor 外部存储了所有无关租约的信息,包含租约 ID,到期工夫,租约绑定的键值对等;lessor 实现了一系列接口,是租约性能的具体实现逻辑,包含 Grant(创立),Revoke(撤销),Renew(续租) 等。

type lessor struct {
    mu sync.RWMutex
    demotec chan struct{}
    // 寄存所有无效的 lease 信息,key 为 leaseID,value 包含该租约的 ID,ttl,lease 绑定的 key 等信息
    leaseMap             map[LeaseID]*Lease
    // 便于查找 lease 的一个数据结构,基于最小堆实现,能够将快到期的租约放到队头,查看是否过期时候,只须要查看队头即可
    leaseExpiredNotifier *LeaseExpiredNotifier
    // 用于实时更新 lease 的剩余时间
    leaseCheckpointHeap  LeaseQueue
    // 用户寄存的 key 与 lease 的绑定关系,通过 key 能够找到租约
    itemMap              map[LeaseItem]LeaseID
    ......
    // 过期的 lease 会被放到该 chan 中,被消费者清理
    expiredC chan []*Lease
    ......
}

在 lessor 被初始化后,同时会启动一个 goroutine,用于频繁的查看是否有过期的 lease 以及更新 lease 剩余时间。lease 的这些查看是集群的 leader 节点做的,包含更新残余的工夫,保护 lease 的最小堆,到期时候撤销 lease。而 follower 节点只用于响应 leader 节点的存储、更新或撤销 lease 申请。

func (le *lessor) runLoop() {defer close(le.doneC)
    for {
        // 查看是否有过期的 lease
        le.revokeExpiredLeases()
        //checkpoint 机制查看并更新 lease 的剩余时间
        le.checkpointScheduledLeases()
        // 每 500 毫秒查看一次
        select {case <-time.After(500 * time.Millisecond):
        case <-le.stopC:
            return
        }
    }
}

为了涵盖大部分场景,咱们假如一个三节点的 etcd 集群的场景,通过下面的案例代码对其中的一个 follower 节点发动申请。

创立

当 v3 客户端调用 Grant 办法时候,会对应到 server 端 LeaseServer 的 LeaseGrant 办法,该办法会通过一系列的两头步骤(鉴权等)达到 etcdServer 包装实现的 LeaseGrant 办法,该办法会调用 raft 模块并封装一个 Lease 的提案并进行数据同步流程。因为此时节点是 follower,会将申请转交给 leader 进行解决,leader 接到申请后会将该提案封装成一个日志,并播送到 follower 节点,follower 节点执行提案音讯,并回复给 leader 节点。

在 follower 节点执行提案内容时候,会解析出该申请是一个创立 lease 的申请,该流程是在 apply 模块执行的。apply 模块会调用本人包装好的 LeaseGrant 办法。

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
    op := "unknown"
    ar := &applyResult{}
    ......
​
    // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
    switch {
    ......
    case r.LeaseGrant != nil:
        op = "LeaseGrant"
        ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
    ......
    default:
        a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
    }
    return ar
}

LeaseGrant 办法是对 lessor 实现的 Grant 办法的进一步封装。

func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
    resp := &pb.LeaseGrantResponse{}
    if err == nil {resp.ID = int64(l.ID)
        resp.TTL = l.TTL()
        resp.Header = newHeader(a.s)
    }
    return resp, err
}
lessor 通过 Grant 办法将 lease 封装并存入到本人的 leaseMap,并经 lease 长久化到 boltdb。func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
    ......
    // 封装 lease
    l := &Lease{
        ID:      id,
        ttl:     ttl,
         // 用于寄存该 lease 绑定的 key,用于在 lease 过期时删除 key
        itemSet: make(map[LeaseItem]struct{}),
        revokec: make(chan struct{}),
    }
    ......
    // 如果是 leader 节点,则刷新 lease 的到期工夫
    if le.isPrimary() {l.refresh(0)
    } else {
        //follower 节点中没有存储 lease 的到期工夫
        l.forever()}
​
    le.leaseMap[id] = l
    //lease 信息长久化
    l.persistTo(le.b)
    // 如果是 leader 节点,就将 lease 信息放到最小堆中
    if le.isPrimary() {item := &LeaseWithTime{id: l.ID, time: l.expiry}
        le.leaseExpiredNotifier.RegisterOrUpdate(item)
        le.scheduleCheckpointIfNeeded(l)
    }
​
    return l, nil
}

绑定

lease 创立好之后,就能够通过 Put 指令创立一个数据并与 lease 进行绑定。在 Put 时候,put 的 value 字段中会有一个 leaseID,并存到了 boltDB。这样能够在 etcd 挂掉之后,能够依据长久化存储来复原 lease 与数据的对应关系。

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
    ......
    kv := mvccpb.KeyValue{
        Key:            key,
        Value:          value,
        CreateRevision: c,
        ModRevision:    rev,
        Version:        ver,
        //lease 字段
        Lease:          int64(leaseID),
    }
    //.... 长久化等操作
    //attach 操作
    if leaseID != lease.NoLease {
        if tw.s.le == nil {panic("no lessor to attach lease")
        }
        err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
        if err != nil {panic("unexpected error from lease Attach")
        }
    }
    tw.trace.Step("attach lease to kv pair")
}

lessor 的 Attach 操作会将 lease 与 key 两者进行绑定并存到本身的 itemMap 以及 lease 的 itemSet 中。

func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
    ......
    l := le.leaseMap[id]
    l.mu.Lock()
    for _, it := range items {
        // 存到 lease 的 itemSet
        l.itemSet[it] = struct{}{}
        // 存到 lessor 的 itemMap 中
        le.itemMap[it] = id
    }
    l.mu.Unlock()
    return nil
}

保活

客户端提供的 keepAlive 办法用于 lease 进行续租,每次调用都会使得 lease 的剩余时间回到初始化时候设定的剩余时间。因为 lease 的一些查看以及保护都是由 leader 节点维持,所以当咱们发送申请到 follower 时,会间接将申请重定向到 leader 节点。

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
    // 发送到 follower 会返回 ErrNotPrimary 的谬误
    ttl, err := s.lessor.Renew(id)
    if err == nil {// already requested to primary lessor(leader)
        return ttl, nil
    }
    ......
    for cctx.Err() == nil && err != nil {
        // 获取 leader 节点
        leader, lerr := s.waitLeader(cctx)
        if lerr != nil {return -1, lerr}
        for _, url := range leader.PeerURLs {
            lurl := url + leasehttp.LeasePrefix
             // 通过 http 接口申请到 leader 的 keeplaive 接口
            ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
            if err == nil || err == lease.ErrLeaseNotFound {return ttl, err}
        }
        time.Sleep(50 * time.Millisecond)
    }
    ......
    return -1, ErrCanceled
}

达到 Leader 节点之后会通过 Renew 更新该 lease 的剩余时间,过期工夫以及最小堆中的 lease。

func (le *lessor) Renew(id LeaseID) (int64, error) {le.mu.RLock()
    if !le.isPrimary() {le.mu.RUnlock()
        return -1, ErrNotPrimary
    }
​
    demotec := le.demotec
​
    l := le.leaseMap[id]
    if l == nil {le.mu.RUnlock()
        return -1, ErrLeaseNotFound
    }
    // 当 cp(checkpoint 办法,须要通过 raft 做数据同步的办法)不为空而且剩余时间大于 0 时为 true
    clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
​
    le.mu.RUnlock()
    // 如果 lease 过期
    if l.expired() {
        select {
        case <-l.revokec: //revoke 时候会间接返回
            return -1, ErrLeaseNotFound
        // The expired lease might fail to be revoked if the primary changes.
        // The caller will retry on ErrNotPrimary.
        case <-demotec:
            return -1, ErrNotPrimary
        case <-le.stopC:
            return -1, ErrNotPrimary
        }
    }
​
    if clearRemainingTTL {
        // 通过 checkpoint 办法同步到各个节点 lease 的剩余时间
        le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
    }
​
    le.mu.Lock()
    l.refresh(0)
    item := &LeaseWithTime{id: l.ID, time: l.expiry}
    // 更新最小堆中的 lease
    le.leaseExpiredNotifier.RegisterOrUpdate(item)
    le.mu.Unlock()
    return l.ttl, nil
}

撤销

撤销操作能够由两种形式触发,一种是通过客户端间接调用 Revoke 办法被动触发,一种是 leader 节点检测到 lease 过期时候的被动触发。被动触发绝对简略,follower 节点收到申请后间接调用 raft 模块同步该申请,各个节点收到申请后通过 lessor 被动删除该 lease(删除并没有间接删除 leaseMap 中的 lease,而是敞开对应 revokec),以及删除绑定在下面的 key。

func (le *lessor) Revoke(id LeaseID) error {le.mu.Lock()
    l := le.leaseMap[id]
    // 敞开告诉的管道
    defer close(l.revokec)
    le.mu.Unlock()
​
    if le.rd == nil {return nil}
​
    txn := le.rd()
    //Keys 办法会将 lease 中 itemSet 的 key 取出
    keys := l.Keys()
    sort.StringSlice(keys).Sort()
    // 删除 lease 绑定的 key
    for _, key := range keys {txn.DeleteRange([]byte(key), nil)
    }
​
    le.mu.Lock()
    defer le.mu.Unlock()
    delete(le.leaseMap, l.ID)
    // 删除 boltdb 长久化的 lease
    le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))
​
    txn.End()
    return nil
}

被动触发则通过创立 lessor 时候启动的异步协程 runLoop(),每 500ms 轮询调用 revokeExpiredLeases 来查看是否过期。

func (le *lessor) revokeExpiredLeases() {var ls []*Lease
    // rate limit
    revokeLimit := leaseRevokeRate / 2
    le.mu.RLock()
    // 如果是 leader 节点
    if le.isPrimary() {
        // 在 leaseExpiredNotifier 最小堆中找到过期的 lease
        ls = le.findExpiredLeases(revokeLimit)
    }
    le.mu.RUnlock()
    if len(ls) != 0 {
        select {
        case <-le.stopC:
            return
        case le.expiredC <- ls:// 将过期的 lease 发送到 expireC 中
        default:
        }
    }
}

在 etcd 启动时候,会另外启动一个异步 run 协程,会订阅该 expireC,收到音讯后发动一个 Revoke 提案并进行同步操作。

//leassor 通过 ExpiredLeasesC 办法把 expiredC 裸露进去
func (le *lessor) ExpiredLeasesC() <-chan []*Lease {return le.expiredC}
​
//etcd 启动的异步 run 协程
func (s *EtcdServer) run() {
    ......
    var expiredLeaseC <-chan []*lease.Lease
    if s.lessor != nil {expiredLeaseC = s.lessor.ExpiredLeasesC()
    }
    for{
        select{
           case leases := <-expiredLeaseC:// 接到过期音讯
            s.GoAttach(func() {
                for _, lease := range leases {
                    ......
                    lid := lease.ID
                    s.GoAttach(func() {ctx := s.authStore.WithRoot(s.ctx)
                         // 调用 revoke 办法
                        _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
                        ......
                        <-c
                    })
                }
            })
           ......
            // 其余 case 操作
        }
    }
}
​

小结

为了保持数据的一致性,lease 的创立,删除,checkpoint 等都须要通过 raft 模块进行同步,而在续约阶段则间接通过 http 申请发送到 leader 节点,所有的保护与查看工作都在 leader 节点,大体能够用下图来示意。因为作者对 raft 模块了解不够深刻,所以一笔带过。

Reference

  • etcd-v3.5.0 源码 – https://github.com/etcd-io/et…
  • etcd 如何实现租约 – 拉钩教育,etcd 原理与实际
退出移动版