关于etcd:Etcd-实战练习二

5次阅读

共计 6790 个字符,预计需要花费 17 分钟才能阅读完成。

文章继续更新,微信搜一搜「吴亲强的深夜食堂

上一篇 etcd 实战根底篇 (一) 咱们次要介绍了 etcd 应用场景以及最基础性的一些操作(put、get、watch)。这一篇咱们接着实战 etcd 其余业务场景。

基于 etcd 的分布式锁

基于 etcd 实现一个分布式锁特地简略。etcd 提供了开箱即用的包 concurrency,几行代码就实现一个分布式锁。

package src

import (
  "context"
  "flag"
  "fmt"
  "github.com/coreos/etcd/clientv3"
  "github.com/coreos/etcd/clientv3/concurrency"
  "log"
  "strings"
  "time"
)

var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化 etcd 客户端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析 etcd 的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创立一个 etcd 的客户端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {fmt.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func Lock(id int, lockName string) {client := initEtcdClient()
  defer client.Close()

  // 创立一个 session, 如果程序宕机奔溃,etcd 能够晓得
  s, err := concurrency.NewSession(client)
  if err != nil {log.Fatal(err)
  }
  defer s.Close()

  // 创立一个 etcd locker
  locker := concurrency.NewLocker(s, lockName)

  log.Printf("id:%v 尝试获取锁 %v", id, lockName)
  locker.Lock()
  log.Printf("id:%v 获得锁 %v", id, lockName)

  // 模仿业务耗时
  time.Sleep(time.Millisecond * 300)

  locker.Unlock()
  log.Printf("id:%v 开释锁 %v", id, lockName)
}
 

咱们再写个脚本运行,看看后果。

package main

import (
  "etcd-test/src"
  "sync"
)

func main() {
  var lockName = "locker-test"
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {wg.Add(1)
    go func(item int) {defer wg.Done()
      src.Lock(item, lockName)
    }(i)
  }
  wg.Wait()}
 

咱们发动了 10 个并发抢同一个 key 锁的命令。运行后果如下,

从图片能够看到,同一时刻肯定只有一个 G 失去锁,一个 G 获取到一个锁的前提肯定是以后 key 未被锁。

有人要问了,当一个锁解开时,之前未获取到锁而产生期待的客户端谁先获取到这把锁?这个问题,咱们后续剖析原理的时候再揭晓。

说到分布式锁,不得不提起 redis。它有一个看似平安理论一点都不平安的分布式锁。它的命令模式是,

set key value [EX seconds] [PX milliseconds] [NX|XX]

这其中,介绍两个要害的属性:

  • EX 标示设置过期工夫,单位是秒。
  • NX 示意 当对应的 key 不存在时,才创立。

咱们在应用 redis 做分布式锁的时候会这么写。(代码用了包 https://github.com/go-redis/redis)

func RedisLock(item int) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  fmt.Printf("item:%v 尝试获取锁, 工夫:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result()
  if !res {fmt.Printf("item:%v 尝试获取锁失败 \\n", item)
    return
  }

  fmt.Printf("item:%v 获取到锁, 工夫:%v\\n", item, time.Now().String())
  time.Sleep(1 * time.Second) // 模仿业务耗时
  fmt.Printf("item:%v 开释锁,工夫:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}
 
rdb.SetNX(ctx, "key", "value", 2*time.Second)

咱们规定锁的过期工夫是 2 秒,上面有一句 time.Sleep(1 * time.Second) 用来模仿解决业务的耗时。业务解决完结,咱们删除 key rdb.Del(ctx, "key")

咱们写个简略的脚本,

func main() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {wg.Add(1)
    go func(item int) {defer wg.Done()
      RedisLock(item)
    }(i)
  }
  wg.Wait()}
 

咱们开启十个 G 并发的调用 RedisLock 函数。每次调用,函数外部都会新建一个 redis 客户端,实质上是 10 个客户端。

运行这段程序,

从图中看出,同一时刻只有一个客户端获取到锁,并且在一秒的工作解决后,开释了锁,如同没太大的问题。

那么,我再写一个简略的例子。

import (
  "context"
  "fmt"
  "github.com/go-redis/redis/v8"
  "sync"
  "time"
)

var ctx = context.Background()
var rdb *redis.Client

func main() {
  var wg sync.WaitGroup
  wg.Add(2)
  go func() {defer wg.Done()
    ExampleLock(1, 0)
  }()

  go func() {defer wg.Done()
    ExampleLock(2, 5)
  }()
  wg.Wait()}


func ExampleLock(item int, timeSleep time.Duration) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  if timeSleep > 0 {time.Sleep(time.Second * timeSleep)
  }
  fmt.Printf("item:%v 尝试获取锁, 工夫:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result()
  if !res {fmt.Printf("item: 尝试获取锁失败:%v\\n", item)
    return
  }

  fmt.Printf("item:%v 获取到锁, 工夫:%v\\n", item, time.Now().String())
  time.Sleep(7 * time.Second)
  fmt.Printf("item:%v 开释锁,工夫:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}

咱们设置锁的过期工夫是 3 秒,而获取锁之后的工作解决工夫为 7 秒。

而后咱们开启两个 G。

ExampleLock(1, 0)ExampleLock(2, 5)

其中第二行数字 5,从代码中能够看出,是指启动 G 后过 5 秒去获取锁。

这段代码整体流程是这样的:G(1) 获取到锁后,设置的锁持有工夫是 3 秒,因为工作执行须要 7 秒的工夫,因而在 3 秒过后锁会主动开释。G(2) 能够在第 5 秒的时候获取到锁,而后它执行工作也得 7 秒。

最初,G(1)在获取锁后 7 秒执行开释锁的操作,G(2)同理。

发现问题了吗?

G(1) 的锁在 3 秒后曾经主动开释了。然而在工作解决完结后又执行理解锁的操作, 可此时这个锁是 G(2) 的呀。

那么接下来因为 G(1) 误会了 G(2) 的锁,如果此时有其余的 G,那么就能够获取到锁。

等 G(2) 工作执行完结,同理又会误会其余 G 的锁,这是一个恶性循环。这也是掘金一篇由 redis 分布式锁造成茅台超卖重大事故的起因之一。

至于其余的,能够自行查看这篇文章 Redis——由分布式锁造成的重大事故。

基于 etcd 的分布式队列

对队列更多的理论知识就不加以介绍了。咱们都晓得,队列是一种先进先出的数据结构,个别也只有入队和出队两种操作。咱们经常在单机的利用中应用到队列。

那么,如何实现一个分布式的队列呢?。

咱们能够应用 etcd 开箱即用的工具,在 etcd 底层 recipe 包里构造 Queue,实现了一个多读多写的分布式队列。

type Queue struct {
  client *v3.Client
  ctx context.Context

  keyPrefix string
}
func NewQueue(client *v3.Client, keyPrefix string) *Queue
func (q *Queue) Dequeue() (string, error)
func (q *Queue) Enqueue(val string)
 

咱们基于此包能够很不便的实现。

package src

import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化 etcd 客户端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析 etcd 的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创立一个 etcd 的客户端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {log.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func Push(keyName string) {client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  var wg sync.WaitGroup

  for i := 0; i < 3; i++ {
    for j := 0; j < 10; j++ {wg.Add(1)
      go func(item int) {defer wg.Done()
        err := q.Enqueue(strconv.Itoa(item))
        if err != nil {log.Printf("push err:%v\\n", err)
        }
      }(j)
    }
    time.Sleep(2 * time.Second)
  }
  wg.Wait()}

func Pop(keyName string) {client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  for {res, err := q.Dequeue()
    if err != nil {log.Fatal(err)
      return
    }
    log.Printf("接管值:%v\\n", res)
  }
}

在 push 中,咱们开启 3 轮发送值入队,每次发送 10 个,发送一轮劳动 2 秒。在 pop 中,通过死循环获取队列中的值。

运行脚本程序如下。

package main

import (
  "etcd-test/src"
  "time"
)

func main() {
  key := "test-queue"
  go src.Pop(key)
  time.Sleep(1 * time.Second)
  go src.Push(key)
  time.Sleep(20 * time.Second)
}

咱们应用两个 G 代表 别离运行 push 和 pop 操作。同时为了达到运行成果,咱们先运行 pop 期待有入队的元素。运行后果动画如下,

etcd 还提供了优先级的分布式的队列。和下面的用法类似。只是在入队的时候,不仅仅须要提供一个值,还须要提供一个整数,来示意以后 push 值的优先级。数值越小,优先级越高。

咱们改变一下上述的代码。

package src

import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化 etcd 客户端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析 etcd 的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创立一个 etcd 的客户端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {log.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func PriorityPush(keyName string) {client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  var wg sync.WaitGroup

  for j := 0; j < 10; j++ {wg.Add(1)
    go func(item int) {defer wg.Done()
      err := q.Enqueue(strconv.Itoa(item), uint16(item))
      if err != nil {log.Printf("push err:%v\\n", err)
      }
    }(j)
  }
  wg.Wait()}

func PriorityPop(keyName string) {client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  for {res, err := q.Dequeue()
    if err != nil {log.Fatal(err)
      return
    }
    log.Printf("接管值:%v\\n", res)
  }
}

而后以下是咱们的测试代码:

package main

import (
  "etcd-test/src"
  "sync"
  "time"
)
func main() {
  key := "test-queue"
  var wg sync.WaitGroup
  wg.Add(1)
  go func() {defer wg.Done()
    src.PriorityPush(key)
  }()
  wg.Wait()
  go src.PriorityPop(key)
  time.Sleep(20 * time.Second)
}

咱们把 0 到 9 的数并发的 push 到队列中,对应的优先级整数值就是它自身,push 结束,咱们运行 PriorityPop 函数,看最终结果显示就是从 0 到 9。

总结

这篇文章次要介绍了如何应用 etcd 实现分布式锁以及分布式队列。其余 etcd 的场景,能够自行实际。

正文完
 0