共计 3047 个字符,预计需要花费 8 分钟才能阅读完成。
近期发现,开发性能的时候发现了一个 mq 生产程序错乱(历史遗留问题),导致业务异样的问题,看看我是如何解决的
问题抛出
首先,简略介绍一下状况:
线上 k8s 有多个 pod 会去生产 mq 中的音讯,可是生产者发送的音讯是冀望肯定要有序去生产,此时要表白的是,例如 生产者如果发送了 3 个告诉音讯,别离是
- 1 零碎曾经在 / 组上面增加 a 组,你记得绑定策略(例如 / 组绑定的是策略是:容许看视频类型的网站)
<!—->
- 2 零碎曾经在 /a 组下增加了 b 组,你记得绑定策略(冀望绑定的策略和他的父组策略一样)
<!—->
- 3 零碎曾经在 b 组上面增加 小 d 用户,你的绑定策略(冀望绑定的策略和他的所在组一样)
此处,若有 3 个 pod 的别离拿到了上述 3 条音讯,然而本身理论生产结束的程序可能是 先实现了 3 音讯对应的业务逻辑,再是 2 音讯 的业务逻辑,最初是 1 音讯的业务逻辑
那么这个时候,小 d 用户就没有绑定上 容许看视频类型的网站 这一条策略,天然 b 组 和 a 组也没有绑定上这条策略,这就和咱们预期的齐全不统一了
当然,理论状况对于单条单条的音讯解决根本不会呈现这种偏差,然而在批量解决的时候,就会呈现理论业务解决程序与冀望不统一的状况,那么就是妥妥的线上问题了(小 d 上网的时候想看视频,可是始终看不了,于是就疯狂投诉。。。)
思考解决
对于这个问题如何解决呢?
咱们晓得,咱们应用 mq 的目标是为了做到去解决咱们的异步逻辑,还能对流量进行削峰,服务间解耦
对于咱们的 A 服务,曾经解决了对于增加用户的,增加组的逻辑,发送告诉音讯给到 B 服务的时候,B 服务本身的解决程序,未依照既定的程序实在依照程序生产结束,导致呈现了业务问题
想法一
咱们是冀望 B 服务团队去增加批量接口,A 服务将须要告诉的信息,排序好给到 B 服务,一个整包,B 服务的单个 pod 接管到这个大包,而后依照程序解决音讯即可,然而这个形式弊病比拟显著
- 当发送了多个批量大包音讯的时候,B 服务如果本身解决不过去,也会导致相似的问题,无奈根治
<!—->
- 须要 B 服务新增和批改的代码较多,必定谈不下来
<!—->
- 而且对于绑定策略的服务来说,不仅仅是 B 服务,还有 C 服务,D 服务呢,他们都要革新 … 这个想法就。。。
想法二
对于这一个业务,也不能去对整个架构大改,对于这些历史遗留问题,能少动就少动,兄弟们你们都懂的
于是便想出了应用 redis 分布式锁来解决,对于一个部署在 k8s 中服务的多个 pod 去抢占,谁先抢到锁,那么就谁生产 mq 中的音讯,没有抢到锁的 pod,那就过一会再抢
当然,对于其余类型的业务是没有影响的
如何去实现这个想法呢,咱们能够模仿一下
- 1 首先,咱们设置一个 redis 的 key,例如 [服务名]_lockmq,值的话咱们就任意设置,默认就用 服务名 做 value 吧,过期工夫暂定 30 秒,有需要的能够调大
<!—->
-
2 如果设置胜利,则解决胜利之后的事件
-
2.1 初始化 mq 消费者,并开启协程进行生产
- 2.1.1 如果初始化失败,则间接返回,退到第 1 步
-
2.2 对 redis 锁进行续期,此处咱们 10 秒续期一次
- 如果续期失败,则间接返回,退到第 1 步
-
<!—->
- 3 若拿锁失败,则劳动 10 秒再去拿锁
这样来解决的话,咱们就能够应答多个 pod 来生产同一类音讯的时候,保障同时只有一个 pod 在解决 mq 中的音讯了,当然如果正在解决音讯的 pod 呈现了异样,对于其余 pod,最晚会在 40 秒之后拿到锁,对于大量的音讯来说,这个还是能够容忍的
对应的代码逻辑如下:
-
简略连贯 redis,redis 分布式锁的主逻辑如下
- 连贯 redis,DB 默认为 0 号
var rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "123456",
DB: 0,
})
func LockMq(svrName string) {key := fmt.Sprintf("%s_lockmq", svrName)
// 尝试加锁
var set bool
for {set = redisLock(key, svrName, time.Second*30)
if set {log.Println("redisLock success")
if err := afterLockSuccess(key); err != nil {
// 如果此处有 err,天然是 mq 初始化失败
log.Println("mq init error:", err)
}else{log.Println("redisLock expire failed")
}
time.Sleep(time.Second * 10)
continue
}
// afterLockFailed()
log.Println("redisLock failed")
time.Sleep(time.Second * 10)
}
}
-
根本的加锁实现
- 设置 key,value,过期工夫为 30 秒
func redisLock(key, value string, duration time.Duration) bool {set, err := rdb.SetNX(context.TODO(), key, value, duration).Result()
if err != nil {log.Println("setnx failed, error:", err)
return false
}
return set
}
- 加锁胜利之后,初始化 mq 客户端并进行生产,续期 redis 分布式锁
func afterLockSuccess(key string) error {
// 初始化须要做的内容或者句柄
// xxx
// 对于此处的初始化 mq 句柄失败才返回 err
ch := make(chan struct{}, 1)
go func() {
// 模仿生产音讯
for {
select {
case <-ch:
log.Println("expire failed,mq close")
return
default:
log.Println("is consuming msg")
time.Sleep(time.Second * 2)
}
}
}()
for {time.Sleep(time.Second*10)
// 续期
set, err := rdb.PExpire(context.TODO(), key, time.Second*30).Result()
if err != nil {log.Println("PExpire error!!", err)
return nil
}
if !set {ch <- struct{}{}
log.Println("PExpire failed!!")
return nil
}
log.Println("PExpire success!!")
}
}
具体的测试间接调用 LockMq 函数即可
func main(){go redislock.LockMq("helloworld")
select{}}
模仿启动多个 pod 去抢锁,抢到锁的执行业务,持续续期,抢不到锁的劳动一会再接着抢
程序 a 先启动,程序 b 后启动
程序 a 日志如下:
程序 a 起来之后,启动一段时间之后,kill 掉 程序 a
程序 b 日志如下:
程序 b 先是获取锁失败,过 30s 左右,程序 b 能失常获取到锁
对于源码能够查看地址:https://github.com/qingconglaixueit/my_redis_demo
感激浏览,欢送交换,点个赞,关注一波 再走吧
欢送点赞,关注,珍藏
敌人们,你的反对和激励,是我保持分享,提高质量的能源
好了,本次就到这里
技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。
我是 阿兵云原生,欢送点赞关注珍藏,下次见~