上一篇文章中引入了音讯队列对秒杀流量做削峰的解决,咱们应用的是Kafka,看起来仿佛工作的不错,但其实还是有很多隐患存在,如果这些隐患不优化解决掉,那么秒杀抢购流动开始后可能会呈现音讯沉积、生产提早、数据不统一、甚至服务解体等问题,那么结果可想而知。本篇文章咱们就一起来把这些隐患解决掉。
批量数据聚合
在SeckillOrder这个办法中,每来一次秒杀抢购申请都往往Kafka中发送一条音讯。如果这个时候有一千万的用户同时来抢购,就算咱们做了各种限流策略,一瞬间还是可能会有上百万的音讯会发到Kafka,会产生大量的网络IO和磁盘IO老本,大家都晓得Kafka是基于日志的音讯零碎,写音讯尽管大多状况下都是程序IO,但当海量的音讯同时写入的时候还是可能会扛不住。
那怎么解决这个问题呢?答案是做音讯的聚合。之前发送一条音讯就会产生一次网络IO和一次磁盘IO,咱们做音讯聚合后,比方聚合100条音讯后再发送给Kafka,这个时候100条音讯才会产生一次网络IO和磁盘IO,对整个Kafka的吞吐和性能是一个十分大的晋升。其实这就是一种小包聚合的思维,或者叫Batch或者批量的思维。这种思维也随处可见,比方咱们应用Mysql插入批量数据的时候,能够通过一条SQL语句执行而不是循环的一条一条插入,还有Redis的Pipeline操作等等。
那怎么来聚合呢,聚合策略是啥呢?聚合策略有两个维度别离是聚合音讯条数和聚合工夫,比方聚合音讯达到100条咱们就往Kafka发送一次,这个条数是能够配置的,那如果始终也达不到100条音讯怎么办呢?通过聚合工夫来兜底,这个聚合工夫也是能够配置的,比方配置聚合工夫为1秒钟,也就是无论目前聚合了多少条音讯只有聚合工夫达到1秒,那么就往Kafka发送一次数据。聚合条数和聚合工夫是或的关系,也就是只有有一个条件满足就触发。
在这里咱们提供一个批量聚合数据的工具Batcher,定义如下
type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup}
Do办法:满足聚合条件后就会执行Do办法,其中val参数为聚合后的数据
Sharding办法:通过Key进行sharding,雷同的key音讯写入到同一个channel中,被同一个goroutine解决
在merge办法中有两个触发执行Do办法的条件,一是当聚合的数据条数大于等于设置的条数,二是当触发设置的定时器
代码实现比较简单,如下为具体实现:
type msg struct { key string val interface{}}type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup}func New(opts ...Option) *Batcher { b := &Batcher{} for _, opt := range opts { opt.apply(&b.opts) } b.opts.check() b.chans = make([]chan *msg, b.opts.worker) for i := 0; i < b.opts.worker; i++ { b.chans[i] = make(chan *msg, b.opts.buffer) } return b}func (b *Batcher) Start() { if b.Do == nil { log.Fatal("Batcher: Do func is nil") } if b.Sharding == nil { log.Fatal("Batcher: Sharding func is nil") } b.wait.Add(len(b.chans)) for i, ch := range b.chans { go b.merge(i, ch) }}func (b *Batcher) Add(key string, val interface{}) error { ch, msg := b.add(key, val) select { case ch <- msg: default: return ErrFull } return nil}func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) { sharding := b.Sharding(key) % b.opts.worker ch := b.chans[sharding] msg := &msg{key: key, val: val} return ch, msg}func (b *Batcher) merge(idx int, ch <-chan *msg) { defer b.wait.Done() var ( msg *msg count int closed bool lastTicker = true interval = b.opts.interval vals = make(map[string][]interface{}, b.opts.size) ) if idx > 0 { interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker))) } ticker := time.NewTicker(interval) for { select { case msg = <-ch: if msg == nil { closed = true break } count++ vals[msg.key] = append(vals[msg.key], msg.val) if count >= b.opts.size { break } continue case <-ticker.C: if lastTicker { ticker.Stop() ticker = time.NewTicker(b.opts.interval) lastTicker = false } } if len(vals) > 0 { ctx := context.Background() b.Do(ctx, vals) vals = make(map[string][]interface{}, b.opts.size) count = 0 } if closed { ticker.Stop() return } }}func (b *Batcher) Close() { for _, ch := range b.chans { ch <- nil } b.wait.Wait()}
应用的时候须要先创立一个Batcher,而后定义Batcher的Sharding办法和Do办法,在Sharding办法中通过ProductID把不同商品的聚合投递到不同的goroutine中解决,在Do办法中咱们把聚合的数据一次性批量的发送到Kafka,定义如下:
b := batcher.New( batcher.WithSize(batcherSize), batcher.WithBuffer(batcherBuffer), batcher.WithWorker(batcherWorker), batcher.WithInterval(batcherInterval),)b.Sharding = func(key string) int { pid, _ := strconv.ParseInt(key, 10, 64) return int(pid) % batcherWorker}b.Do = func(ctx context.Context, val map[string][]interface{}) { var msgs []*KafkaData for _, vs := range val { for _, v := range vs { msgs = append(msgs, v.(*KafkaData)) } } kd, err := json.Marshal(msgs) if err != nil { logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err) } if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil { logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err) }}s.batcher = bs.batcher.Start()
在SeckillOrder办法中不再是每来一次申请就往Kafka中投递一次音讯,而是先通过batcher提供的Add办法增加到Batcher中期待满足聚合条件后再往Kafka中投递。
err = l.batcher.Add(strconv.FormatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId})if err!= nil { logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err)}
升高音讯的生产提早
通过批量音讯解决的思维,咱们提供了Batcher工具,晋升了性能,但这次要是针对生产端而言的。当咱们生产到批量的数据后,还是须要串行的一条条的解决数据,那有没有方法能减速生产从而升高生产音讯的提早呢?有两种计划别离是:
- 减少消费者的数量
- 在一个消费者中减少音讯解决的并行度
因为在Kafka中,一个Topci能够配置多个Partition,数据会被均匀或者依照生产者指定的形式写入到多个分区中,那么在生产的时候,Kafka约定一个分区只能被一个消费者生产,为什么要这么设计呢?我了解的是如果有多个Consumer同时生产一个分区的数据,那么在操作这个生产进度的时候就须要加锁,对性能影响比拟大。所以说当消费者数量小于分区数量的时候,咱们能够减少消费者的数量来减少音讯解决能力,但当消费者数量大于分区的时候再持续减少消费者数量就没有意义了。
不能减少Consumer的时候,能够在同一个Consumer中晋升解决音讯的并行度,即通过多个goroutine来并行的生产数据,咱们一起来看看如何通过多个goroutine来生产音讯。
在Service中定义msgsChan,msgsChan为Slice,Slice的长度示意有多少个goroutine并行的解决数据,初始化如下:
func NewService(c config.Config) *Service { s := &Service{ c: c, ProductRPC: product.NewProduct(zrpc.MustNewClient(c.ProductRPC)), OrderRPC: order.NewOrder(zrpc.MustNewClient(c.OrderRPC)), msgsChan: make([]chan *KafkaData, chanCount), } for i := 0; i < chanCount; i++ { ch := make(chan *KafkaData, bufferCount) s.msgsChan[i] = ch s.waiter.Add(1) go s.consume(ch) } return s}
从Kafka中生产到数据后,把数据投递到Channel中,留神投递音讯的时候依照商品的id做Sharding,这能保障在同一个Consumer中对同一个商品的解决是串行的,串行的数据处理不会导致并发带来的数据竞争问题
func (s *Service) Consume(_ string, value string) error { logx.Infof("Consume value: %s\n", value) var data []*KafkaData if err := json.Unmarshal([]byte(value), &data); err != nil { return err } for _, d := range data { s.msgsChan[d.Pid%chanCount] <- d } return nil}
咱们定义了chanCount个goroutine同时解决数据,每个channel的长度定义为bufferCount,并行处理数据的办法为consume,如下:
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.Product pid: %d error: %v", m.Pid, err) return } if p.Stock <= 0 { logx.Errorf("stock is zero pid: %d", m.Pid) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } }}
怎么保障不会超卖
当秒杀流动开始后,大量用户点击商品详情页上的秒杀按钮,会产生大量的并发申请查问库存,一旦某个申请查问到有库存,紧接着零碎就会进行库存的扣减。而后,系统生成理论的订单,并进行后续的解决。如果申请查不到库存,就会返回,用户通常会持续点击秒杀按钮,持续查问库存。简略来说,这个阶段的操作就是三个:查看库存,库存扣减、和订单解决。因为每个秒杀申请都会查问库存,而申请只有查到库存无余量后,后续的库存扣减和订单解决才会被执行,所以,这个阶段中最大的并发压力都在库存查看操作上。
为了撑持大量高并发的库存查看申请,咱们须要应用Redis独自保留库存量。那么,库存扣减和订单解决是否都能够交给Mysql来解决呢?其实,订单的解决是能够在数据库中执行的,但库存扣减操作不能交给Mysql间接解决。因为到了理论的订单解决环节,申请的压力曾经不大了,数据库齐全能够撑持这些订单解决申请。那为什么库存扣减不能间接在数据库中执行呢?这是因为,一旦申请查到有库存,就意味着该申请取得购买资格,紧接着就会进行下单操作,同时库存量会减一,这个时候如果间接操作数据库来扣减库存可能就会导致超卖问题。
间接操作数据库扣减库存为什么会导致超卖呢?因为数据库的处理速度较慢,不能及时更新库存余量,这就会导致大量的查问库存的申请读取到旧的库存值,并进行下单,此时就会呈现下单数量大于理论的库存量,导致超卖。所以,就须要间接在Redis中进行库存扣减,具体的操作是,当库存查看完后,一旦库存无余量,咱们就立刻在Redis中扣减库存,同时,为了防止申请查问到旧的库存值,库存检查和库存扣减这两个操作须要保障原子性。
咱们应用Redis的Hash来存储库存,total为总库存,seckill为已秒杀的数量,为了保障查问库存和减库存的原子性,咱们应用Lua脚本进行原子操作,让秒杀量小于库存的时候返回1,示意秒杀胜利,否则返回0,示意秒杀失败,代码如下:
const ( luaCheckAndUpdateScript = `local counts = redis.call("HMGET", KEYS[1], "total", "seckill")local total = tonumber(counts[1])local seckill = tonumber(counts[2])if seckill + 1 <= total then redis.call("HINCRBY", KEYS[1], "seckill", 1) return 1endreturn 0`)func (l *CheckAndUpdateStockLogic) CheckAndUpdateStock(in *product.CheckAndUpdateStockRequest) (*product.CheckAndUpdateStockResponse, error) { val, err := l.svcCtx.BizRedis.EvalCtx(l.ctx, luaCheckAndUpdateScript, []string{stockKey(in.ProductId)}) if err != nil { return nil, err } if val.(int64) == 0 { return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("insufficient stock: %d", in.ProductId)) } return &product.CheckAndUpdateStockResponse{}, nil}func stockKey(pid int64) string { return fmt.Sprintf("stock:%d", pid)}
对应的seckill-rmq代码批改如下:
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) _, err := s.ProductRPC.CheckAndUpdateStock(context.Background(), &product.CheckAndUpdateStockRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.CheckAndUpdateStock pid: %d error: %v", m.Pid, err) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } }}
到这里,咱们曾经理解了如何应用原子性的Lua脚本来实现库存的检查和扣减。其实要想保障库存检查和扣减的原子性,还有另外一种办法,那就是应用分布式锁。
分布式锁的实现形式有很多种,能够基于Redis、Etcd等等,用Redis实现分布式锁的文章比拟多,感兴趣的能够自行搜寻参考。这里给大家简略介绍下基于Etcd来实现分布式锁。为了简化分布式锁、分布式选举、分布式事务的实现,etcd社区提供了一个名为concurrency的包来帮忙咱们更简略、正确的应用分布式锁。它的实现非常简单,次要流程如下:
- 首先通过concurrency.NewSession办法创立Session,实质上是创立了一个TTL为10的Lease
- 失去Session对象后,通过concurrency.NewMutex创立一个mutex对象,包含了Lease、key prefix等信息
- 而后听过mutex对象的Lock办法尝试获取锁
- 最初通过mutex对象的Unlock办法开释锁
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})if err != nil { log.Fatal(err)}defer cli.Close()session, err := concurrency.NewSession(cli, concurrency.WithTTL(10))if err != nil { log.Fatal(err)}defer session.Close()mux := concurrency.NewMutex(session, "lock")if err := mux.Lock(context.Background()); err != nil { log.Fatal(err)}if err := mux.Unlock(context.Background()); err != nil { log.Fatal(err)}
结束语
本篇文章次要是针对秒杀性能持续做了一些优化。在Kafka音讯的生产端做了批量音讯聚合发送的优化,Batch思维在理论生产开发中应用十分多,心愿大家可能活灵便用,在音讯的生产端通过减少并行度来晋升吞吐能力,这也是晋升性能罕用的优化伎俩。最初介绍了可能导致超卖的起因,以及给出了绝对应的解决方案。同时,介绍了基于Etcd的分布式锁,在分布式服务中经常出现数据竞争的问题,个别能够通过分布式锁来解决,但分布式锁的引入势必会导致性能的降落,所以,还须要结合实际状况思考是否须要引入分布式锁。
心愿本篇文章对你有所帮忙,谢谢。
每周一、周四更新
代码仓库: https://github.com/zhoushuguang/lebron
我的项目地址
https://github.com/zeromicro/go-zero
欢送应用 go-zero
并 star 反对咱们!
微信交换群
关注『微服务实际』公众号并点击 交换群 获取社区群二维码。