转载自:etcd实现分布式锁

当并发的访问共享资源的时候,如果没有加锁的话,无奈保障共享资源安全性和正确性。这个时候就须要用到锁

1、须要具备的个性

  1. 须要保障互斥拜访(分布式环境须要保障不同节点、不同线程的互斥拜访)
  2. 须要有超时机制,避免锁意外未开释,其余节点无奈获取到锁;也要保障工作可能失常执行实现,不能超时了工作还没完结,导致工作执行个别被开释锁
  3. 须要有阻塞和非阻塞两种申请锁的接口

2、本地锁

当业务执行在同一个线程内,也就是我初始化一个本地锁,其余申请也认这把锁。个别是服务部署在单机环境下。

咱们能够看下上面的例子,开1000个goroutine并发的给Counter做自增操作,后果会是什么样的呢?

package mainimport (    "fmt"    "sync")var sg sync.WaitGrouptype Counter struct {    count int}// 自增操作func (m *Counter) Incr() {    m.count++}// 获取总数func (m *Counter) Count() int {    return m.count}func main() {    c := &Counter{}    for i := 0; i < 1000; i++ {        sg.Add(1)    // 模仿并发申请        go func() {            c.Incr()            sg.Done()        }()    }    sg.Wait()    fmt.Println(c.Count())}

后果是count的数量并不是料想中的1000,而是上面这样,每次打印出的后果都不一样,然而靠近1000

user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go953 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go982 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go984

呈现这个问题的起因就是没有给自增操作加锁

上面咱们批改代码如下,在Incr中加上go的mutex互斥锁

package mainimport (    "fmt"    "sync")var sg sync.WaitGrouptype Counter struct {    count int    mu    sync.Mutex}func (m *Counter) Incr() {  // 每次写之前先加锁,写完之后开释锁    m.mu.Lock()    defer m.mu.Unlock()    m.count++}func (m *Counter) Count() int {    return m.count}func main() {    c := &Counter{}    for i := 0; i < 1000; i++ {        sg.Add(1)        go func() {            c.Incr()            sg.Done()        }()    }    sg.Wait()    fmt.Println(c.Count())}

能够看到当初count失常输入1000了

user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go1000 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go1000 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.go1000

3、etcd分布式锁

简略部署一个etcd集群

├── docker-compose.yml├── etcd│   └── Dockerfile

Dockerfile文件内容

FROM bitnami/etcd:latestLABEL maintainer="liuyuede123 <liufutianoppo@163.com>"

Docker-compose.yml内容

version: '3.5'# 网络配置networks:  backend:    driver: bridge# 服务容器配置services:  etcd1:                                  # 自定义容器名称    build:      context: etcd                    # 指定构建应用的 Dockerfile 文件    environment:      - TZ=Asia/Shanghai      - ALLOW_NONE_AUTHENTICATION=yes      - ETCD_NAME=etcd1      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380      - ETCD_INITIAL_CLUSTER_STATE=new    ports:                               # 设置端口映射      - "12379:2379"      - "12380:2380"    networks:      - backend    restart: always  etcd2: # 自定义容器名称    build:      context: etcd                    # 指定构建应用的 Dockerfile 文件    environment:      - TZ=Asia/Shanghai      - ALLOW_NONE_AUTHENTICATION=yes      - ETCD_NAME=etcd2      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380      - ETCD_INITIAL_CLUSTER_STATE=new    ports: # 设置端口映射      - "22379:2379"      - "22380:2380"    networks:      - backend    restart: always  etcd3: # 自定义容器名称    build:      context: etcd                    # 指定构建应用的 Dockerfile 文件    environment:      - TZ=Asia/Shanghai      - ALLOW_NONE_AUTHENTICATION=yes      - ETCD_NAME=etcd3      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380      - ETCD_INITIAL_CLUSTER_STATE=new    ports: # 设置端口映射      - "32379:2379"      - "32380:2380"    networks:      - backend    restart: always

执行docker-compose up -d启动etcd服务,能够看到docker中曾经启动了3个服务

实现互斥拜访

package mainimport (    "fmt"    clientv3 "go.etcd.io/etcd/client/v3"    "go.etcd.io/etcd/client/v3/concurrency"    "sync")var sg sync.WaitGrouptype Counter struct {    count int}func (m *Counter) Incr() {    m.count++}func (m *Counter) Count() int {    return m.count}func main() {    endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"}  // 初始化etcd客户端    client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})    if err != nil {        fmt.Println(err)        return    }    defer client.Close()    counter := &Counter{}    sg.Add(100)    for i := 0; i < 100; i++ {        go func() {      // 这里会生成租约,默认是60秒            session, err := concurrency.NewSession(client)            if err != nil {                panic(err)            }            defer session.Close()            locker := concurrency.NewLocker(session, "/my-test-lock")            locker.Lock()            counter.Incr()            locker.Unlock()            sg.Done()        }()    }    sg.Wait()    fmt.Println("count:", counter.Count())}

执行后果:

user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.gocount: 100 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.gocount: 100 user@userdeMacBook-Pro  ~/go/src/go-demo/mutex  go run main.gocount: 100

实现超时机制

当某个客户端持有锁时,因为某些起因导致锁未开释,就会导致这个客户端始终持有这把锁,其余客户端始终获取不到锁。所以须要分布式锁实现超时机制,当锁未开释时,会因为etcd的租约会到期而开释锁。当业务失常解决时,租约到期之前会持续续约,晓得业务处理完毕开释锁。

package mainimport (    "fmt"    clientv3 "go.etcd.io/etcd/client/v3"    "go.etcd.io/etcd/client/v3/concurrency"    "sync"    "time")var sg sync.WaitGrouptype Counter struct {    count int}func (m *Counter) Incr() {    m.count++}func (m *Counter) Count() int {    return m.count}func main() {    endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"}    client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})    if err != nil {        fmt.Println(err)        return    }    defer client.Close()    counter := &Counter{}    session, err := concurrency.NewSession(client)    if err != nil {        panic(err)    }    defer session.Close()    locker := concurrency.NewLocker(session, "/my-test-lock")    fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05"))    locker.Lock()    fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05"))  // 模仿业务    time.Sleep(100 * time.Second)    counter.Incr()    locker.Unlock()    fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))    fmt.Println("count:", counter.Count())}

命令行开2个窗口,第一个窗口执行程序并获取锁,之后模仿意外退出并没有调用unlock办法

go run main.golocking... 2022-09-03 23:41:48 # 租约生成工夫locked... 2022-09-03 23:41:48^Csignal: interrupt

第二个窗口,在第一个窗口退出之前尝试获取锁,此时是阻塞状态。第一个窗口退出之后因为租约还没到期,第二个窗口还是获取锁的状态。等到第一个窗口租约到期(默认60秒),第二个获取锁胜利

locking... 2022-09-03 23:41:52locked... 2022-09-03 23:42:48 # 第一个租约60秒到期,获取锁胜利released... 2022-09-03 23:44:28count: 1

实现阻塞和非阻塞接口

下面的例子中曾经实现了阻塞接口,即以后有获取到锁的申请,则其余申请阻塞期待锁开释

非阻塞的形式就是尝试获取锁,如果失败立刻返回。etcd中是实现了tryLock办法

// TryLock locks the mutex if not already locked by another session.// If lock is held by another session, return immediately after attempting necessary cleanup// The ctx argument is used for the sending/receiving Txn RPC.func (m *Mutex) TryLock(ctx context.Context) error {

具体看上面的例子

package mainimport (    "context"    "fmt"    clientv3 "go.etcd.io/etcd/client/v3"    "go.etcd.io/etcd/client/v3/concurrency"    "sync"    "time")var sg sync.WaitGrouptype Counter struct {    count int}func (m *Counter) Incr() {    m.count++}func (m *Counter) Count() int {    return m.count}func main() {    endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"}    client, err := clientv3.New(clientv3.Config{Endpoints: endpoints})    if err != nil {        fmt.Println(err)        return    }    defer client.Close()    counter := &Counter{}    session, err := concurrency.NewSession(client)    if err != nil {        panic(err)    }    defer session.Close()  // 此处应用newMutex初始化    locker := concurrency.NewMutex(session, "/my-test-lock")    fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05"))    err = locker.TryLock(context.Background())  // 获取锁失败就抛错    if err != nil {        fmt.Println("lock failed", err)        return    }    fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05"))    time.Sleep(100 * time.Second)    counter.Incr()    err = locker.Unlock(context.Background())    if err != nil {        fmt.Println("unlock failed", err)        return    }    fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05"))    fmt.Println("count:", counter.Count())}

窗口1、窗口2执行后果

go run main.golocking... 2022-09-04 00:00:21locked... 2022-09-04 00:00:21released... 2022-09-04 00:02:01count: 1
go run main.golocking... 2022-09-04 00:00:27lock failed mutex: Locked by another session