乐趣区

关于etcd:etcd分布式锁的实现

etcd 分布式锁并不是 etcd server 对外提供一个性能 api,而是基于 etcd 的各种个性(lease、watch、mvcc 等)集成的一个工具。

在同一个过程外面,为了防止对共享变量产生数据竞争,通常能够通过加锁解锁的形式来防止。然而如果是多个过程,操作同一份资源,就不能用一般的锁了,这时候的“锁”须要一个能共享的介质来存储,应用它的过程能够通过一般的加锁解锁形式来防止同时操作。

思考的问题

上面咱们从一个最根本的流程剖析下一个分布式锁要思考哪些事件。

加锁

在对共享资源操作时候,首先须要加锁,在加锁时候,抢到锁的过程能够间接返回,进而操作共享资源,而没有抢到锁的过程须要期待锁的开释,对于同一个锁,同一时刻只能有一个过程来持有,这体现了锁的互斥性。

锁期间

因为是多过程状况,须要思考过程宕机的状况,如果抢到锁的过程忽然宕机,须要可能有开释锁的机制,防止前面的过程始终阻塞导致死锁。提供锁的组件也应该具备高可用性,在某个节点宕机后可能持续提供服务。

解锁

对资源的操作完结之后,须要及时开释锁,然而不能开释其余过程的锁,前面没有抢到锁的过程能够取得锁。如果抢锁的过程过多,可能会导致惊群效应,提供锁的组件应在肯定水平上防止该景象。

实现思路

etcd 的几种非凡的机制都能够作为分布式锁的根底。etcd 的键值对能够作为锁的本体,锁的创立与删除对应键值对的创立与删除。etcd 的分布式一致性以及高可用能够保障锁的高可用性。

prefix

因为 etcd 反对前缀查找,能够将锁设置成“锁名称”+“惟一 id”的格局,保障锁的对称性,即每个客户端只操作本人持有的锁。

lease

租约机制能够为锁做一个保活操作,在创立锁的时候绑定租约,并定期进行续约,如果取得锁期间客户端意外宕机,则持有的锁会被主动删除,防止了死锁的产生。

Revision

etcd 外部保护了一个全局的 Revision 值,并会随着事务的递增而递增。能够用 Revision 值的大小来决定获取锁的先后顺序,在上锁的时候曾经决定了获取锁先后顺序,后续有客户端开释锁也不会产生惊群效应。

watch

watch 机制能够用于监听锁的删除事件,不用应用忙轮询的形式查看是否开释了锁,更加高效。同时,在 watch 时候能够通过 Revision 来进行监听,只须要监听间隔本人最近而且比本人小的一个 Revision 就能够做到锁的实时获取。

源码剖析

在 etcdv3 版本的客户端库中曾经有了分布式锁的实现,让咱们看一下实现逻辑。

示例
func main() {
    // 初始化 etcd 客户端
    cli, _ := clientv3.New(clientv3.Config{Endpoints:   []string{"127.0.0.1:23790"},
        DialTimeout: time.Second,
    })
    // 创立一个 session,并依据业务状况设置锁的 ttl
    s, _ := concurrency.NewSession(cli, concurrency.WithTTL(3))
    defer s.Close()
    // 初始化一个锁的实例,并进行加锁解锁操作。mu := concurrency.NewMutex(s, "mutex-linugo")
    if err := mu.Lock(context.TODO()); err != nil {log.Fatal("m lock err:", err)
    }
    //do something
    if err := mu.Unlock(context.TODO()); err != nil {log.Fatal("m unlock err:", err)
    }
}

在调用 NewSession 办法时候实际上是初始化了一个用户指定行为的租约(行为能够是指定 ttl,或者复用其余的 lease 等),并异步进行 keepalive。

type Mutex struct {
    s *Session // 保留的租约相干的信息

    pfx   string // 锁的名称,key 的前缀
    myKey string // 锁残缺的 key
    myRev int64  // 本人的版本号
    hdr   *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {return &Mutex{s, pfx + "/", "", -1, nil}
}

NewMutex 实际上创立了一个锁的数据结构,该构造能够保留一些锁的信息,入参的“mutex-linugo”只是一个 key 的前缀,还有后续要创立的残缺 key,revision 等信息。

Lock
func (m *Mutex) Lock(ctx context.Context) error {
    // 首先尝试获取锁
    resp, err := m.tryAcquire(ctx)
    if err != nil {return err}
    ......
}

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
    s := m.s
    client := m.s.Client()
    // 残缺 key 是前缀名称加租约 ID,因为不同过程生成的不同租约,所以锁互不雷同
    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    //cmp 通过比拟 createRevision 是否为 0 判断以后的 key 是不是第一次创立
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    //put 会把 key 绑定上租约并存储
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    //get 会获取以后 key 的值
    get := v3.OpGet(m.myKey)
    //getOwner 是通过前缀来范畴查找,WithFirstCreate() 筛选出以后存在的最小 revision 对应的值
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {return nil, err}
    // 将该事务的 revision 赋值到锁的 myRev 字段
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    return resp, nil
}

在获取锁的时候,通过事务操作来尝试加锁。

如果以后的 key 是第一次创立,则将 key 绑定租约并存储,否则获取以后的 key 详细信息。getOwner 通过前缀来进行查找最小 revision 对应的值,目标是获取以后锁的持有者(如果最小 Revison 的 key 开释锁,则该 key 会被删除,所以最小 Revision 的 key 就是以后锁的持有者)。

!resp.Succeeded 代表 key 不是第一次创立,则之前执行的是 get 操作,获取该 key 创立时候的 revision 并赋值到锁的 myRev 字段。

回到主函数,目前 etcd 中曾经存有锁相干信息了,前面会通过比拟 Revision 来判断本人取得了锁还是须要期待锁,如果本人的 myRev 与 ownerKey 的 Revsion 雷同,阐明本人就是锁的持有者。

func (m *Mutex) Lock(ctx context.Context) error {resp, err := m.tryAcquire(ctx)
    if err != nil {return err}
    //ownerKey 就是以后持有锁的值
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    // 如果 ownerKey 的长度为 0 或者持有者的 Revision 与本人的 Revision 雷同,阐明本人持有锁,能够间接返回,并对共享资源进行操作
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    ......
    // 期待锁的开释
    client := m.s.Client()
    _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    if werr != nil {m.Unlock(client.Ctx())
        return werr
    }
    // 确保 session 没有过期
    gresp, werr := client.Get(ctx, m.myKey)
    if werr != nil {m.Unlock(client.Ctx())
        return werr
    }

    if len(gresp.Kvs) == 0 {return ErrSessionExpired}
    m.hdr = gresp.Header

    return nil
}
waitDeletes

如果没有取得锁,就须要期待后面锁的开释了,这里次要用到 watch 机制。

func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    //getOpts 会通过两个 Option 函数获取小于传入的 maxCreateRev 的 Revision 的 key 汇合且找出汇合中最大的 Revison 对应的 key
    // 次要是用于获取前一个上锁的 key,进而能够 watch 该 key 的删除事件
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        //get 通过 getOpts 的动作来获取键值对
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {return nil, err}
        // 如果长度是 0,阐明 key 不存在,代表被删除,后面的锁曾经被开释了,能够间接返回
        if len(resp.Kvs) == 0 {return resp.Header, nil}
        lastKey := string(resp.Kvs[0].Key)
        // 否则通过 watch 监听上一个锁的删除事件
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {return nil, err}
    }
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {cctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var wr v3.WatchResponse
    // 通过 Revsion 来 watch key,也就是前一个锁
    wch := client.Watch(cctx, key, v3.WithRev(rev))
    for wr = range wch {
        for _, ev := range wr.Events {
             // 监听 Delete 事件
            if ev.Type == mvccpb.DELETE {return nil}
        }
    }
    if err := wr.Err(); err != nil {return err}
    if err := ctx.Err(); err != nil {return err}
    return fmt.Errorf("lost watcher waiting for delete")
}

waitDeletes 失常返回后该过程会取得锁,进入操作共享资源。

UnLock

解锁操作会间接删除对应的 kv,这会触发下一个锁的获取。

func (m *Mutex) Unlock(ctx context.Context) error {client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {return err}
    m.myKey = "\x00"
    m.myRev = -1
    return nil
}

小结

etcd 分布式锁稳固的背地是对其自身各种个性的充分利用。本节咱们首先剖析了分布式锁满足的个性,其次列举了 etcd 的各种个性对分布式锁的反对状况,最初剖析了 clientV3 的 concurrency 包是怎么实现分布式锁性能的。

Reference

  • etcdV3.5.1 源码 – https://github.com/etcd-io/et…
  • 拉勾教育 – 如何基于 etcd 实现分布式锁?
退出移动版