文章继续更新,微信搜一搜「 吴亲强的深夜食堂 」
上一篇etcd 实战根底篇(一)咱们次要介绍了 etcd 应用场景以及最基础性的一些操作(put、get、watch)。 这一篇咱们接着实战etcd其余业务场景。
基于 etcd 的分布式锁
基于 etcd 实现一个分布式锁特地简略。etcd 提供了开箱即用的包 concurrency,几行代码就实现一个分布式锁。
package srcimport ( "context" "flag" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "log" "strings" "time")var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")// 初始化etcd客户端func initEtcdClient() *clientv3.Client { var client *clientv3.Client var err error // 解析etcd的地址,编程[]string endpoints := strings.Split(*addr, ",") // 创立一个 etcd 的客户端 client, err = clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) if err != nil { fmt.Printf("初始化客户端失败:%v\\n", err) log.Fatal(err) } return client}func Lock(id int, lockName string) { client := initEtcdClient() defer client.Close() // 创立一个 session,如果程序宕机奔溃,etcd能够晓得 s, err := concurrency.NewSession(client) if err != nil { log.Fatal(err) } defer s.Close() // 创立一个etcd locker locker := concurrency.NewLocker(s, lockName) log.Printf("id:%v 尝试获取锁%v", id, lockName) locker.Lock() log.Printf("id:%v获得锁%v", id, lockName) // 模仿业务耗时 time.Sleep(time.Millisecond * 300) locker.Unlock() log.Printf("id:%v开释锁%v", id, lockName)}
咱们再写个脚本运行,看看后果。
package mainimport ( "etcd-test/src" "sync")func main() { var lockName = "locker-test" var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(item int) { defer wg.Done() src.Lock(item, lockName) }(i) } wg.Wait()}
咱们发动了10个并发抢同一个 key 锁的命令。运行后果如下,
从图片能够看到,同一时刻肯定只有一个 G 失去锁,一个 G 获取到一个锁的前提肯定是以后 key 未被锁。
有人要问了,当一个锁解开时,之前未获取到锁而产生期待的客户端谁先获取到这把锁? 这个问题,咱们后续剖析原理的时候再揭晓。
说到分布式锁,不得不提起 redis。它有一个看似平安理论一点都不平安的分布式锁。它的命令模式是,
set key value [EX seconds] [PX milliseconds] [NX|XX]
这其中,介绍两个要害的属性:
- EX 标示设置过期工夫,单位是秒。
- NX 示意 当对应的 key 不存在时,才创立。
咱们在应用 redis 做分布式锁的时候会这么写。(代码用了包 https://github.com/go-redis/redis
)
func RedisLock(item int) { rdb = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", DB: 0, }) fmt.Printf("item:%v 尝试获取锁,工夫:%v\\n", item, time.Now().String()) res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result() if !res { fmt.Printf("item:%v 尝试获取锁失败\\n", item) return } fmt.Printf("item:%v 获取到锁,工夫:%v\\n", item, time.Now().String()) time.Sleep(1 * time.Second) //模仿业务耗时 fmt.Printf("item:%v 开释锁,工夫:%v\\n", item, time.Now().String()) rdb.Del(ctx, "key")}
rdb.SetNX(ctx, "key", "value", 2*time.Second)
咱们规定锁的过期工夫是2秒,上面有一句 time.Sleep(1 * time.Second)
用来模仿解决业务的耗时。业务解决完结,咱们删除 key rdb.Del(ctx, "key")
。
咱们写个简略的脚本,
func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(item int) { defer wg.Done() RedisLock(item) }(i) } wg.Wait()}
咱们开启十个 G 并发的调用 RedisLock
函数。每次调用,函数外部都会新建一个 redis 客户端,实质上是10个客户端。
运行这段程序,
从图中看出,同一时刻只有一个客户端获取到锁,并且在一秒的工作解决后,开释了锁,如同没太大的问题。
那么,我再写一个简略的例子。
import ( "context" "fmt" "github.com/go-redis/redis/v8" "sync" "time")var ctx = context.Background()var rdb *redis.Clientfunc main() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() ExampleLock(1, 0) }() go func() { defer wg.Done() ExampleLock(2, 5) }() wg.Wait()}func ExampleLock(item int, timeSleep time.Duration) { rdb = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", DB: 0, }) if timeSleep > 0 { time.Sleep(time.Second * timeSleep) } fmt.Printf("item:%v 尝试获取锁,工夫:%v\\n", item, time.Now().String()) res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result() if !res { fmt.Printf("item:尝试获取锁失败:%v\\n", item) return } fmt.Printf("item:%v 获取到锁,工夫:%v\\n", item, time.Now().String()) time.Sleep(7 * time.Second) fmt.Printf("item:%v 开释锁,工夫:%v\\n", item, time.Now().String()) rdb.Del(ctx, "key")}
咱们设置锁的过期工夫是 3 秒,而获取锁之后的工作解决工夫为 7 秒。
而后咱们开启两个 G。
ExampleLock(1, 0)ExampleLock(2, 5)
其中第二行数字5,从代码中能够看出,是指启动 G 后过5秒去获取锁。
这段代码整体流程是这样的:G(1) 获取到锁后,设置的锁持有工夫是3秒,因为工作执行须要7秒的工夫,因而在3秒过后锁会主动开释。G(2) 能够在第5秒的时候获取到锁,而后它执行工作也得7秒。
最初,G(1)在获取锁后7秒执行开释锁的操作,G(2)同理。
发现问题了吗?
G(1) 的锁在3秒后曾经主动开释了。然而在工作解决完结后又执行理解锁的操作,可此时这个锁是 G(2) 的呀。
那么接下来因为 G(1) 误会了 G(2) 的锁,如果此时有其余的 G,那么就能够获取到锁。
等 G(2) 工作执行完结,同理又会误会其余 G 的锁,这是一个恶性循环。 这也是掘金一篇由 redis 分布式锁造成茅台超卖重大事故的起因之一。
至于其余的,能够自行查看这篇文章Redis——由分布式锁造成的重大事故。
基于 etcd 的分布式队列
对队列更多的理论知识就不加以介绍了。咱们都晓得,队列是一种先进先出的数据结构,个别也只有入队和出队两种操作。 咱们经常在单机的利用中应用到队列。
那么,如何实现一个分布式的队列呢?。
咱们能够应用 etcd 开箱即用的工具,在 etcd
底层 recipe
包里构造 Queue
,实现了一个多读多写的分布式队列。
type Queue struct { client *v3.Client ctx context.Context keyPrefix string}func NewQueue(client *v3.Client, keyPrefix string) *Queuefunc (q *Queue) Dequeue() (string, error)func (q *Queue) Enqueue(val string)
咱们基于此包能够很不便的实现。
package srcimport ( "github.com/coreos/etcd/clientv3" recipe "github.com/coreos/etcd/contrib/recipes" "log" "strconv" "strings" "sync" "time")var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")// 初始化etcd客户端func initEtcdClient() *clientv3.Client { var client *clientv3.Client var err error // 解析etcd的地址,编程[]string endpoints := strings.Split(*addr, ",") // 创立一个 etcd 的客户端 client, err = clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) if err != nil { log.Printf("初始化客户端失败:%v\\n", err) log.Fatal(err) } return client}func Push(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewQueue(client, keyName) var wg sync.WaitGroup for i := 0; i < 3; i++ { for j := 0; j < 10; j++ { wg.Add(1) go func(item int) { defer wg.Done() err := q.Enqueue(strconv.Itoa(item)) if err != nil { log.Printf("push err:%v\\n", err) } }(j) } time.Sleep(2 * time.Second) } wg.Wait()}func Pop(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewQueue(client, keyName) for { res, err := q.Dequeue() if err != nil { log.Fatal(err) return } log.Printf("接管值:%v\\n", res) }}
在 push
中,咱们开启3轮发送值入队,每次发送10个,发送一轮劳动2秒。 在 pop
中,通过死循环获取队列中的值。
运行脚本程序如下。
package mainimport ( "etcd-test/src" "time")func main() { key := "test-queue" go src.Pop(key) time.Sleep(1 * time.Second) go src.Push(key) time.Sleep(20 * time.Second)}
咱们应用两个 G
代表 别离运行 push
和 pop
操作。 同时为了达到运行成果,咱们先运行 pop 期待有入队的元素。 运行后果动画如下,
etcd
还提供了优先级的分布式的队列。和下面的用法类似。只是在入队的时候,不仅仅须要提供一个值,还须要提供一个整数,来示意以后 push
值的优先级。数值越小,优先级越高。
咱们改变一下上述的代码。
package srcimport ( "github.com/coreos/etcd/clientv3" recipe "github.com/coreos/etcd/contrib/recipes" "log" "strconv" "strings" "sync" "time")var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")// 初始化etcd客户端func initEtcdClient() *clientv3.Client { var client *clientv3.Client var err error // 解析etcd的地址,编程[]string endpoints := strings.Split(*addr, ",") // 创立一个 etcd 的客户端 client, err = clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) if err != nil { log.Printf("初始化客户端失败:%v\\n", err) log.Fatal(err) } return client}func PriorityPush(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewPriorityQueue(client, keyName) var wg sync.WaitGroup for j := 0; j < 10; j++ { wg.Add(1) go func(item int) { defer wg.Done() err := q.Enqueue(strconv.Itoa(item), uint16(item)) if err != nil { log.Printf("push err:%v\\n", err) } }(j) } wg.Wait()}func PriorityPop(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewPriorityQueue(client, keyName) for { res, err := q.Dequeue() if err != nil { log.Fatal(err) return } log.Printf("接管值:%v\\n", res) }}
而后以下是咱们的测试代码:
package mainimport ( "etcd-test/src" "sync" "time")func main() { key := "test-queue" var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() src.PriorityPush(key) }() wg.Wait() go src.PriorityPop(key) time.Sleep(20 * time.Second)}
咱们把0到9的数并发的 push
到队列中,对应的优先级整数值就是它自身,push
结束,咱们运行 PriorityPop
函数,看最终结果显示就是从0到9。
总结
这篇文章次要介绍了如何应用 etcd 实现分布式锁以及分布式队列。其余etcd的场景,能够自行实际。