乐趣区

关于go:gozero微服务实战系列九极致优化秒杀性能

上一篇文章中引入了音讯队列对秒杀流量做削峰的解决,咱们应用的是 Kafka,看起来仿佛工作的不错,但其实还是有很多隐患存在,如果这些隐患不优化解决掉,那么秒杀抢购流动开始后可能会呈现音讯沉积、生产提早、数据不统一、甚至服务解体等问题,那么结果可想而知。本篇文章咱们就一起来把这些隐患解决掉。

批量数据聚合

SeckillOrder 这个办法中,每来一次秒杀抢购申请都往往 Kafka 中发送一条音讯。如果这个时候有一千万的用户同时来抢购,就算咱们做了各种限流策略,一瞬间还是可能会有上百万的音讯会发到 Kafka,会产生大量的网络 IO 和磁盘 IO 老本,大家都晓得 Kafka 是基于日志的音讯零碎,写音讯尽管大多状况下都是程序 IO,但当海量的音讯同时写入的时候还是可能会扛不住。

那怎么解决这个问题呢?答案是做音讯的聚合。之前发送一条音讯就会产生一次网络 IO 和一次磁盘 IO,咱们做音讯聚合后,比方聚合 100 条音讯后再发送给 Kafka,这个时候 100 条音讯才会产生一次网络 IO 和磁盘 IO,对整个 Kafka 的吞吐和性能是一个十分大的晋升。其实这就是一种小包聚合的思维,或者叫 Batch 或者批量的思维。这种思维也随处可见,比方咱们应用 Mysql 插入批量数据的时候,能够通过一条 SQL 语句执行而不是循环的一条一条插入,还有 Redis 的 Pipeline 操作等等。

那怎么来聚合呢,聚合策略是啥呢?聚合策略有两个维度别离是聚合音讯条数和聚合工夫,比方聚合音讯达到 100 条咱们就往 Kafka 发送一次,这个条数是能够配置的,那如果始终也达不到 100 条音讯怎么办呢?通过聚合工夫来兜底,这个聚合工夫也是能够配置的,比方配置聚合工夫为 1 秒钟,也就是无论目前聚合了多少条音讯只有聚合工夫达到 1 秒,那么就往 Kafka 发送一次数据。聚合条数和聚合工夫是或的关系,也就是只有有一个条件满足就触发。

在这里咱们提供一个批量聚合数据的工具 Batcher,定义如下

type Batcher struct {
  opts options

  Do       func(ctx context.Context, val map[string][]interface{})
  Sharding func(key string) int
  chans    []chan *msg
  wait     sync.WaitGroup
}

Do 办法:满足聚合条件后就会执行 Do 办法,其中 val 参数为聚合后的数据

Sharding 办法:通过 Key 进行 sharding,雷同的 key 音讯写入到同一个 channel 中,被同一个 goroutine 解决

在 merge 办法中有两个触发执行 Do 办法的条件,一是当聚合的数据条数大于等于设置的条数,二是当触发设置的定时器

代码实现比较简单,如下为具体实现:

type msg struct {
  key string
  val interface{}}

type Batcher struct {
  opts options

  Do       func(ctx context.Context, val map[string][]interface{})
  Sharding func(key string) int
  chans    []chan *msg
  wait     sync.WaitGroup
}

func New(opts ...Option) *Batcher {b := &Batcher{}
  for _, opt := range opts {opt.apply(&b.opts)
  }
  b.opts.check()

  b.chans = make([]chan *msg, b.opts.worker)
  for i := 0; i < b.opts.worker; i++ {b.chans[i] = make(chan *msg, b.opts.buffer)
  }
  return b
}

func (b *Batcher) Start() {
  if b.Do == nil {log.Fatal("Batcher: Do func is nil")
  }
  if b.Sharding == nil {log.Fatal("Batcher: Sharding func is nil")
  }
  b.wait.Add(len(b.chans))
  for i, ch := range b.chans {go b.merge(i, ch)
  }
}

func (b *Batcher) Add(key string, val interface{}) error {ch, msg := b.add(key, val)
  select {
  case ch <- msg:
  default:
    return ErrFull
  }
  return nil
}

func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) {sharding := b.Sharding(key) % b.opts.worker
  ch := b.chans[sharding]
  msg := &msg{key: key, val: val}
  return ch, msg
}

func (b *Batcher) merge(idx int, ch <-chan *msg) {defer b.wait.Done()

  var (
    msg        *msg
    count      int
    closed     bool
    lastTicker = true
    interval   = b.opts.interval
    vals       = make(map[string][]interface{}, b.opts.size)
  )
  if idx > 0 {interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker)))
  }
  ticker := time.NewTicker(interval)
  for {
    select {
    case msg = <-ch:
      if msg == nil {
        closed = true
        break
      }
      count++
      vals[msg.key] = append(vals[msg.key], msg.val)
      if count >= b.opts.size {break}
      continue
    case <-ticker.C:
      if lastTicker {ticker.Stop()
        ticker = time.NewTicker(b.opts.interval)
        lastTicker = false
      }
    }
    if len(vals) > 0 {ctx := context.Background()
      b.Do(ctx, vals)
      vals = make(map[string][]interface{}, b.opts.size)
      count = 0
    }
    if closed {ticker.Stop()
      return
    }
  }
}

func (b *Batcher) Close() {
  for _, ch := range b.chans {ch <- nil}
  b.wait.Wait()}

应用的时候须要先创立一个 Batcher,而后定义 Batcher 的 Sharding 办法和 Do 办法,在 Sharding 办法中通过 ProductID 把不同商品的聚合投递到不同的 goroutine 中解决,在 Do 办法中咱们把聚合的数据一次性批量的发送到 Kafka,定义如下:

b := batcher.New(batcher.WithSize(batcherSize),
  batcher.WithBuffer(batcherBuffer),
  batcher.WithWorker(batcherWorker),
  batcher.WithInterval(batcherInterval),
)
b.Sharding = func(key string) int {pid, _ := strconv.ParseInt(key, 10, 64)
  return int(pid) % batcherWorker
}
b.Do = func(ctx context.Context, val map[string][]interface{}) {var msgs []*KafkaData
  for _, vs := range val {
    for _, v := range vs {msgs = append(msgs, v.(*KafkaData))
    }
  }
  kd, err := json.Marshal(msgs)
  if err != nil {logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err)
  }
  if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil {logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err)
  }
}
s.batcher = b
s.batcher.Start()

SeckillOrder 办法中不再是每来一次申请就往 Kafka 中投递一次音讯,而是先通过 batcher 提供的 Add 办法增加到 Batcher 中期待满足聚合条件后再往 Kafka 中投递。

err = l.batcher.Add(strconv.FormatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId})
if err!= nil {logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err)
}

升高音讯的生产提早

通过批量音讯解决的思维,咱们提供了 Batcher 工具,晋升了性能,但这次要是针对生产端而言的。当咱们生产到批量的数据后,还是须要串行的一条条的解决数据,那有没有方法能减速生产从而升高生产音讯的提早呢?有两种计划别离是:

  • 减少消费者的数量
  • 在一个消费者中减少音讯解决的并行度

因为在 Kafka 中,一个 Topci 能够配置多个 Partition,数据会被均匀或者依照生产者指定的形式写入到多个分区中,那么在生产的时候,Kafka 约定一个分区只能被一个消费者生产,为什么要这么设计呢?我了解的是如果有多个 Consumer 同时生产一个分区的数据,那么在操作这个生产进度的时候就须要加锁,对性能影响比拟大。所以说当消费者数量小于分区数量的时候,咱们能够减少消费者的数量来减少音讯解决能力,但当消费者数量大于分区的时候再持续减少消费者数量就没有意义了。

不能减少 Consumer 的时候,能够在同一个 Consumer 中晋升解决音讯的并行度,即通过多个 goroutine 来并行的生产数据,咱们一起来看看如何通过多个 goroutine 来生产音讯。

在 Service 中定义 msgsChan,msgsChan 为 Slice,Slice 的长度示意有多少个 goroutine 并行的解决数据,初始化如下:

func NewService(c config.Config) *Service {
  s := &Service{
    c:          c,
    ProductRPC: product.NewProduct(zrpc.MustNewClient(c.ProductRPC)),
    OrderRPC:   order.NewOrder(zrpc.MustNewClient(c.OrderRPC)),
    msgsChan:   make([]chan *KafkaData, chanCount),
  }
  for i := 0; i < chanCount; i++ {ch := make(chan *KafkaData, bufferCount)
    s.msgsChan[i] = ch
    s.waiter.Add(1)
    go s.consume(ch)
  }

  return s
}

从 Kafka 中生产到数据后,把数据投递到 Channel 中,留神投递音讯的时候依照商品的 id 做 Sharding,这能保障在同一个 Consumer 中对同一个商品的解决是串行的,串行的数据处理不会导致并发带来的数据竞争问题

func (s *Service) Consume(_ string, value string) error {logx.Infof("Consume value: %s\n", value)
  var data []*KafkaData
  if err := json.Unmarshal([]byte(value), &data); err != nil {return err}
  for _, d := range data {s.msgsChan[d.Pid%chanCount] <- d
  }
  return nil
}

咱们定义了 chanCount 个 goroutine 同时解决数据,每个 channel 的长度定义为 bufferCount,并行处理数据的办法为 consume,如下:

func (s *Service) consume(ch chan *KafkaData) {defer s.waiter.Done()

  for {
    m, ok := <-ch
    if !ok {log.Fatal("seckill rmq exit")
    }
    fmt.Printf("consume msg: %+v\n", m)
    p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: m.Pid})
    if err != nil {logx.Errorf("s.ProductRPC.Product pid: %d error: %v", m.Pid, err)
      return
    }
    if p.Stock <= 0 {logx.Errorf("stock is zero pid: %d", m.Pid)
      return
    }
    _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid})
    if err != nil {logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
      return
    }
    _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1})
    if err != nil {logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
    }
  }
}

怎么保障不会超卖

当秒杀流动开始后,大量用户点击商品详情页上的秒杀按钮,会产生大量的并发申请查问库存,一旦某个申请查问到有库存,紧接着零碎就会进行库存的扣减。而后,系统生成理论的订单,并进行后续的解决。如果申请查不到库存,就会返回,用户通常会持续点击秒杀按钮,持续查问库存。简略来说,这个阶段的操作就是三个:查看库存,库存扣减、和订单解决。因为每个秒杀申请都会查问库存,而申请只有查到库存无余量后,后续的库存扣减和订单解决才会被执行,所以,这个阶段中最大的并发压力都在库存查看操作上。

为了撑持大量高并发的库存查看申请,咱们须要应用 Redis 独自保留库存量。那么,库存扣减和订单解决是否都能够交给 Mysql 来解决呢?其实,订单的解决是能够在数据库中执行的,但库存扣减操作不能交给 Mysql 间接解决。因为到了理论的订单解决环节,申请的压力曾经不大了,数据库齐全能够撑持这些订单解决申请。那为什么库存扣减不能间接在数据库中执行呢?这是因为,一旦申请查到有库存,就意味着该申请取得购买资格,紧接着就会进行下单操作,同时库存量会减一,这个时候如果间接操作数据库来扣减库存可能就会导致超卖问题。

间接操作数据库扣减库存为什么会导致超卖呢?因为数据库的处理速度较慢,不能及时更新库存余量,这就会导致大量的查问库存的申请读取到旧的库存值,并进行下单,此时就会呈现下单数量大于理论的库存量,导致超卖。所以,就须要间接在 Redis 中进行库存扣减,具体的操作是,当库存查看完后,一旦库存无余量,咱们就立刻在 Redis 中扣减库存,同时,为了防止申请查问到旧的库存值,库存检查和库存扣减这两个操作须要保障原子性。

咱们应用 Redis 的 Hash 来存储库存,total 为总库存,seckill 为已秒杀的数量,为了保障查问库存和减库存的原子性,咱们应用 Lua 脚本进行原子操作,让秒杀量小于库存的时候返回 1,示意秒杀胜利,否则返回 0,示意秒杀失败,代码如下:

const (
  luaCheckAndUpdateScript = `
local counts = redis.call("HMGET", KEYS[1], "total", "seckill")
local total = tonumber(counts[1])
local seckill = tonumber(counts[2])
if seckill + 1 <= total then
  redis.call("HINCRBY", KEYS[1], "seckill", 1)
  return 1
end
return 0
`
)

func (l *CheckAndUpdateStockLogic) CheckAndUpdateStock(in *product.CheckAndUpdateStockRequest) (*product.CheckAndUpdateStockResponse, error) {val, err := l.svcCtx.BizRedis.EvalCtx(l.ctx, luaCheckAndUpdateScript, []string{stockKey(in.ProductId)})
  if err != nil {return nil, err}
  if val.(int64) == 0 {return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("insufficient stock: %d", in.ProductId))
  }
  return &product.CheckAndUpdateStockResponse{}, nil}

func stockKey(pid int64) string {return fmt.Sprintf("stock:%d", pid)
}

对应的 seckill-rmq 代码批改如下:

func (s *Service) consume(ch chan *KafkaData) {defer s.waiter.Done()

  for {
    m, ok := <-ch
    if !ok {log.Fatal("seckill rmq exit")
    }
    fmt.Printf("consume msg: %+v\n", m)
    _, err := s.ProductRPC.CheckAndUpdateStock(context.Background(), &product.CheckAndUpdateStockRequest{ProductId: m.Pid})
    if err != nil {logx.Errorf("s.ProductRPC.CheckAndUpdateStock pid: %d error: %v", m.Pid, err)
      return
    }
    _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid})
    if err != nil {logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
      return
    }
    _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1})
    if err != nil {logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
    }
  }
}

到这里,咱们曾经理解了如何应用原子性的 Lua 脚本来实现库存的检查和扣减。其实要想保障库存检查和扣减的原子性,还有另外一种办法,那就是应用分布式锁。

分布式锁的实现形式有很多种,能够基于 Redis、Etcd 等等,用 Redis 实现分布式锁的文章比拟多,感兴趣的能够自行搜寻参考。这里给大家简略介绍下基于 Etcd 来实现分布式锁。为了简化分布式锁、分布式选举、分布式事务的实现,etcd 社区提供了一个名为 concurrency 的包来帮忙咱们更简略、正确的应用分布式锁。它的实现非常简单,次要流程如下:

  • 首先通过 concurrency.NewSession 办法创立 Session,实质上是创立了一个 TTL 为 10 的 Lease
  • 失去 Session 对象后,通过 concurrency.NewMutex 创立一个 mutex 对象,包含了 Lease、key prefix 等信息
  • 而后听过 mutex 对象的 Lock 办法尝试获取锁
  • 最初通过 mutex 对象的 Unlock 办法开释锁
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {log.Fatal(err)
}
defer cli.Close()

session, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
if err != nil {log.Fatal(err)
}
defer session.Close()

mux := concurrency.NewMutex(session, "lock")
if err := mux.Lock(context.Background()); err != nil {log.Fatal(err)
}


if err := mux.Unlock(context.Background()); err != nil {log.Fatal(err)
}

结束语

本篇文章次要是针对秒杀性能持续做了一些优化。在 Kafka 音讯的生产端做了批量音讯聚合发送的优化,Batch 思维在理论生产开发中应用十分多,心愿大家可能活灵便用,在音讯的生产端通过减少并行度来晋升吞吐能力,这也是晋升性能罕用的优化伎俩。最初介绍了可能导致超卖的起因,以及给出了绝对应的解决方案。同时,介绍了基于 Etcd 的分布式锁,在分布式服务中经常出现数据竞争的问题,个别能够通过分布式锁来解决,但分布式锁的引入势必会导致性能的降落,所以,还须要结合实际状况思考是否须要引入分布式锁。

心愿本篇文章对你有所帮忙,谢谢。

每周一、周四更新

代码仓库: https://github.com/zhoushuguang/lebron

我的项目地址

https://github.com/zeromicro/go-zero

欢送应用 go-zerostar 反对咱们!

微信交换群

关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。

退出移动版