关于redis:我是如何用-redis-分布式锁来解决线上历史业务问题的

3次阅读

共计 3047 个字符,预计需要花费 8 分钟才能阅读完成。

近期发现,开发性能的时候发现了一个 mq 生产程序错乱(历史遗留问题),导致业务异样的问题,看看我是如何解决的

问题抛出

首先,简略介绍一下状况:

线上 k8s 有多个 pod 会去生产 mq 中的音讯,可是生产者发送的音讯是冀望肯定要有序去生产,此时要表白的是,例如 生产者如果发送了 3 个告诉音讯,别离是

  • 1 零碎曾经在 / 组上面增加 a 组,你记得绑定策略(例如 / 组绑定的是策略是:容许看视频类型的网站)

<!—->

  • 2 零碎曾经在 /a 组下增加了 b 组,你记得绑定策略(冀望绑定的策略和他的父组策略一样)

<!—->

  • 3 零碎曾经在 b 组上面增加 小 d 用户,你的绑定策略(冀望绑定的策略和他的所在组一样)

此处,若有 3 个 pod 的别离拿到了上述 3 条音讯,然而本身理论生产结束的程序可能是 先实现了 3 音讯对应的业务逻辑,再是 2 音讯 的业务逻辑,最初是 1 音讯的业务逻辑

那么这个时候,小 d 用户就没有绑定上 容许看视频类型的网站 这一条策略,天然 b 组 和 a 组也没有绑定上这条策略,这就和咱们预期的齐全不统一了

当然,理论状况对于单条单条的音讯解决根本不会呈现这种偏差,然而在批量解决的时候,就会呈现理论业务解决程序与冀望不统一的状况,那么就是妥妥的线上问题了(小 d 上网的时候想看视频,可是始终看不了,于是就疯狂投诉。。。)

思考解决

对于这个问题如何解决呢?

咱们晓得,咱们应用 mq 的目标是为了做到去解决咱们的异步逻辑,还能对流量进行削峰,服务间解耦

对于咱们的 A 服务,曾经解决了对于增加用户的,增加组的逻辑,发送告诉音讯给到 B 服务的时候,B 服务本身的解决程序,未依照既定的程序实在依照程序生产结束,导致呈现了业务问题

想法一

咱们是冀望 B 服务团队去增加批量接口,A 服务将须要告诉的信息,排序好给到 B 服务,一个整包,B 服务的单个 pod 接管到这个大包,而后依照程序解决音讯即可,然而这个形式弊病比拟显著

  • 当发送了多个批量大包音讯的时候,B 服务如果本身解决不过去,也会导致相似的问题,无奈根治

<!—->

  • 须要 B 服务新增和批改的代码较多,必定谈不下来

<!—->

  • 而且对于绑定策略的服务来说,不仅仅是 B 服务,还有 C 服务,D 服务呢,他们都要革新 … 这个想法就。。。

想法二

对于这一个业务,也不能去对整个架构大改,对于这些历史遗留问题,能少动就少动,兄弟们你们都懂的

于是便想出了应用 redis 分布式锁来解决,对于一个部署在 k8s 中服务的多个 pod 去抢占,谁先抢到锁,那么就谁生产 mq 中的音讯,没有抢到锁的 pod,那就过一会再抢

当然,对于其余类型的业务是没有影响的

如何去实现这个想法呢,咱们能够模仿一下

  • 1 首先,咱们设置一个 redis 的 key,例如 [服务名]_lockmq,值的话咱们就任意设置,默认就用 服务名 做 value 吧,过期工夫暂定 30 秒,有需要的能够调大

<!—->

  • 2 如果设置胜利,则解决胜利之后的事件

    • 2.1 初始化 mq 消费者,并开启协程进行生产

      • 2.1.1 如果初始化失败,则间接返回,退到第 1 步
    • 2.2 对 redis 锁进行续期,此处咱们 10 秒续期一次

      • 如果续期失败,则间接返回,退到第 1 步

<!—->

  • 3 若拿锁失败,则劳动 10 秒再去拿锁

这样来解决的话,咱们就能够应答多个 pod 来生产同一类音讯的时候,保障同时只有一个 pod 在解决 mq 中的音讯了,当然如果正在解决音讯的 pod 呈现了异样,对于其余 pod,最晚会在 40 秒之后拿到锁,对于大量的音讯来说,这个还是能够容忍的

对应的代码逻辑如下:

  • 简略连贯 redis,redis 分布式锁的主逻辑如下

    • 连贯 redis,DB 默认为 0 号
var rdb = redis.NewClient(&redis.Options{
   Addr:     "localhost:6379",
   Password: "123456",
   DB:       0,
})

func LockMq(svrName string) {key := fmt.Sprintf("%s_lockmq", svrName)
   // 尝试加锁
   var set bool
   for {set = redisLock(key, svrName, time.Second*30)
      if set {log.Println("redisLock success")
         if err := afterLockSuccess(key); err != nil {
            // 如果此处有 err,天然是 mq 初始化失败
            log.Println("mq init error:", err)
         }else{log.Println("redisLock expire failed")
         }
         time.Sleep(time.Second * 10)
         continue
      }
      // afterLockFailed()
      log.Println("redisLock failed")
      time.Sleep(time.Second * 10)
   }
}
  • 根本的加锁实现

    • 设置 key,value,过期工夫为 30 秒
func redisLock(key, value string, duration time.Duration) bool {set, err := rdb.SetNX(context.TODO(), key, value, duration).Result()
   if err != nil {log.Println("setnx failed, error:", err)
      return false
   }
   return set
}
  • 加锁胜利之后,初始化 mq 客户端并进行生产,续期 redis 分布式锁
func afterLockSuccess(key string) error {
   // 初始化须要做的内容或者句柄
   // xxx
   // 对于此处的初始化 mq 句柄失败才返回 err
   ch := make(chan struct{}, 1)
   go func() {
      // 模仿生产音讯
      for {
         select {
         case <-ch:
            log.Println("expire failed,mq close")
            return
         default:
            log.Println("is consuming msg")
            time.Sleep(time.Second * 2)
         }
      }
   }()

   for {time.Sleep(time.Second*10)
      // 续期
      set, err := rdb.PExpire(context.TODO(), key, time.Second*30).Result()
      if err != nil {log.Println("PExpire error!!", err)
         return nil
      }
      if !set {ch <- struct{}{}
         log.Println("PExpire failed!!")
         return nil
      }
      log.Println("PExpire success!!")
   }

}

具体的测试间接调用 LockMq 函数即可

func main(){go redislock.LockMq("helloworld")
   select{}}

模仿启动多个 pod 去抢锁,抢到锁的执行业务,持续续期,抢不到锁的劳动一会再接着抢

程序 a 先启动,程序 b 后启动

程序 a 日志如下:

程序 a 起来之后,启动一段时间之后,kill 掉 程序 a

程序 b 日志如下:

程序 b 先是获取锁失败,过 30s 左右,程序 b 能失常获取到锁

对于源码能够查看地址:https://github.com/qingconglaixueit/my_redis_demo

感激浏览,欢送交换,点个赞,关注一波 再走吧

欢送点赞,关注,珍藏

敌人们,你的反对和激励,是我保持分享,提高质量的能源

好了,本次就到这里

技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。

我是 阿兵云原生,欢送点赞关注珍藏,下次见~

正文完
 0