转载自:etcd实现分布式锁
当并发的访问共享资源的时候,如果没有加锁的话,无奈保障共享资源安全性和正确性。这个时候就须要用到锁
1、须要具备的个性
- 须要保障互斥拜访(分布式环境须要保障不同节点、不同线程的互斥拜访)
- 须要有超时机制,避免锁意外未开释,其余节点无奈获取到锁;也要保障工作可能失常执行实现,不能超时了工作还没完结,导致工作执行个别被开释锁
- 须要有阻塞和非阻塞两种申请锁的接口
2、本地锁
当业务执行在同一个线程内,也就是我初始化一个本地锁,其余申请也认这把锁。个别是服务部署在单机环境下。
咱们能够看下上面的例子,开1000个goroutine并发的给Counter做自增操作,后果会是什么样的呢?
package mainimport ( "fmt" "sync")var sg sync.WaitGrouptype Counter struct { count int}// 自增操作func (m *Counter) Incr() { m.count++}// 获取总数func (m *Counter) Count() int { return m.count}func main() { c := &Counter{} for i := 0; i < 1000; i++ { sg.Add(1) // 模仿并发申请 go func() { c.Incr() sg.Done() }() } sg.Wait() fmt.Println(c.Count())}
后果是count的数量并不是料想中的1000,而是上面这样,每次打印出的后果都不一样,然而靠近1000
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go953 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go982 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go984
呈现这个问题的起因就是没有给自增操作加锁
上面咱们批改代码如下,在Incr中加上go的mutex互斥锁
package mainimport ( "fmt" "sync")var sg sync.WaitGrouptype Counter struct { count int mu sync.Mutex}func (m *Counter) Incr() { // 每次写之前先加锁,写完之后开释锁 m.mu.Lock() defer m.mu.Unlock() m.count++}func (m *Counter) Count() int { return m.count}func main() { c := &Counter{} for i := 0; i < 1000; i++ { sg.Add(1) go func() { c.Incr() sg.Done() }() } sg.Wait() fmt.Println(c.Count())}
能够看到当初count失常输入1000了
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go1000 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go1000 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.go1000
3、etcd分布式锁
简略部署一个etcd集群
├── docker-compose.yml├── etcd│ └── Dockerfile
Dockerfile文件内容
FROM bitnami/etcd:latestLABEL maintainer="liuyuede123 <liufutianoppo@163.com>"
Docker-compose.yml内容
version: '3.5'# 网络配置networks: backend: driver: bridge# 服务容器配置services: etcd1: # 自定义容器名称 build: context: etcd # 指定构建应用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd1 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "12379:2379" - "12380:2380" networks: - backend restart: always etcd2: # 自定义容器名称 build: context: etcd # 指定构建应用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd2 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "22379:2379" - "22380:2380" networks: - backend restart: always etcd3: # 自定义容器名称 build: context: etcd # 指定构建应用的 Dockerfile 文件 environment: - TZ=Asia/Shanghai - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd3 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new ports: # 设置端口映射 - "32379:2379" - "32380:2380" networks: - backend restart: always
执行docker-compose up -d
启动etcd服务,能够看到docker中曾经启动了3个服务
实现互斥拜访
package mainimport ( "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync")var sg sync.WaitGrouptype Counter struct { count int}func (m *Counter) Incr() { m.count++}func (m *Counter) Count() int { return m.count}func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} // 初始化etcd客户端 client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close() counter := &Counter{} sg.Add(100) for i := 0; i < 100; i++ { go func() { // 这里会生成租约,默认是60秒 session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close() locker := concurrency.NewLocker(session, "/my-test-lock") locker.Lock() counter.Incr() locker.Unlock() sg.Done() }() } sg.Wait() fmt.Println("count:", counter.Count())}
执行后果:
user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.gocount: 100 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.gocount: 100 user@userdeMacBook-Pro ~/go/src/go-demo/mutex go run main.gocount: 100
实现超时机制
当某个客户端持有锁时,因为某些起因导致锁未开释,就会导致这个客户端始终持有这把锁,其余客户端始终获取不到锁。所以须要分布式锁实现超时机制,当锁未开释时,会因为etcd的租约会到期而开释锁。当业务失常解决时,租约到期之前会持续续约,晓得业务处理完毕开释锁。
package mainimport ( "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync" "time")var sg sync.WaitGrouptype Counter struct { count int}func (m *Counter) Incr() { m.count++}func (m *Counter) Count() int { return m.count}func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close() counter := &Counter{} session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close() locker := concurrency.NewLocker(session, "/my-test-lock") fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05")) locker.Lock() fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05")) // 模仿业务 time.Sleep(100 * time.Second) counter.Incr() locker.Unlock() fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("count:", counter.Count())}
命令行开2个窗口,第一个窗口执行程序并获取锁,之后模仿意外退出并没有调用unlock办法
go run main.golocking... 2022-09-03 23:41:48 # 租约生成工夫locked... 2022-09-03 23:41:48^Csignal: interrupt
第二个窗口,在第一个窗口退出之前尝试获取锁,此时是阻塞状态。第一个窗口退出之后因为租约还没到期,第二个窗口还是获取锁的状态。等到第一个窗口租约到期(默认60秒),第二个获取锁胜利
locking... 2022-09-03 23:41:52locked... 2022-09-03 23:42:48 # 第一个租约60秒到期,获取锁胜利released... 2022-09-03 23:44:28count: 1
实现阻塞和非阻塞接口
下面的例子中曾经实现了阻塞接口,即以后有获取到锁的申请,则其余申请阻塞期待锁开释
非阻塞的形式就是尝试获取锁,如果失败立刻返回。etcd中是实现了tryLock办法
// TryLock locks the mutex if not already locked by another session.// If lock is held by another session, return immediately after attempting necessary cleanup// The ctx argument is used for the sending/receiving Txn RPC.func (m *Mutex) TryLock(ctx context.Context) error {
具体看上面的例子
package mainimport ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "sync" "time")var sg sync.WaitGrouptype Counter struct { count int}func (m *Counter) Incr() { m.count++}func (m *Counter) Count() int { return m.count}func main() { endpoints := []string{"http://127.0.0.1:12379", "http://127.0.0.1:22379", "http://127.0.0.1:32379"} client, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { fmt.Println(err) return } defer client.Close() counter := &Counter{} session, err := concurrency.NewSession(client) if err != nil { panic(err) } defer session.Close() // 此处应用newMutex初始化 locker := concurrency.NewMutex(session, "/my-test-lock") fmt.Println("locking...", time.Now().Format("2006-01-02 15:04:05")) err = locker.TryLock(context.Background()) // 获取锁失败就抛错 if err != nil { fmt.Println("lock failed", err) return } fmt.Println("locked...", time.Now().Format("2006-01-02 15:04:05")) time.Sleep(100 * time.Second) counter.Incr() err = locker.Unlock(context.Background()) if err != nil { fmt.Println("unlock failed", err) return } fmt.Println("released...", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("count:", counter.Count())}
窗口1、窗口2执行后果
go run main.golocking... 2022-09-04 00:00:21locked... 2022-09-04 00:00:21released... 2022-09-04 00:02:01count: 1
go run main.golocking... 2022-09-04 00:00:27lock failed mutex: Locked by another session