乐趣区

关于go:gozero微服务实战系列五缓存代码怎么写

缓存是高并发服务的根底,毫不夸大的说没有缓存高并发服务就无从谈起。本我的项目缓存应用 Redis,Redis 是目前支流的缓存数据库,反对丰盛的数据类型,其中汇合类型的底层次要依赖:整数数组、双向链表、哈希表、压缩列表和跳表五种数据结构。因为底层依赖的数据结构的高效性以及基于多路复用的高性能 I / O 模型,所以 Redis 也提供了十分强悍的性能。下图展现了 Redis 数据类型对应的底层数据结构。

根本应用

在 go-zero 中默认集成了缓存 model 数据的性能,咱们在应用 goctl 主动生成 model 代码的时候加上 -c 参数即可生成集成缓存的 model 代码

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:3306)/product" -table="*"  -dir="./model" -c

通过简略的配置咱们就能够应用 model 层的缓存啦,model 层缓存默认过期工夫为 7 天,如果没有查到数据会设置一个空缓存,空缓存的过期工夫为 1 分钟,model 层 cache 配置和初始化如下:

CacheRedis:
  - Host: 127.0.0.1:6379
    Type: node
CategoryModel: model.NewCategoryModel(conn, c.CacheRedis)

这次演示的代码次要会基于 product-rpc 服务,为了简略咱们间接应用 grpcurl 来进行调试,留神启动的时候次要注册反射服务,通过 goctl 主动生成的 rpc 服务在 dev 或 test 环境下曾经帮咱们注册好了,咱们须要把咱们的 mode 设置为 dev,默认的 mode 为 pro,如下代码所示:

s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {product.RegisterProductServer(grpcServer, svr)
    if c.Mode == service.DevMode || c.Mode == service.TestMode {reflection.Register(grpcServer)
    }
})

间接应用 go install 装置 grpcurl 工具,so easy!!!妈妈再也不必放心我不会调试 gRPC 了

go install github.com/fullstorydev/grpcurl/cmd/grpcurl

启动服务,通过如下命令查问服务,服务提供的办法,能够看到以后提供了 Product 获取商品详情接口和 Products 批量获取商品详情接口

~ grpcurl -plaintext 127.0.0.1:8081 list

grpc.health.v1.Health
grpc.reflection.v1alpha.ServerReflection
product.Product

~ grpcurl -plaintext 127.0.0.1:8081 list product.Product
product.Product.Product
product.Product.Products

咱们先往 product 表里插入一些测试数据,测试数据放在 lebron/sql/data.sql 文件中,此时咱们查看 id 为 1 的商品数据,这时候缓存中是没有 id 为 1 这条数据的

127.0.0.1:6379> EXISTS cache:product:product:id:1
(integer) 0

通过 grpcurl 工具来调用 Product 接口查问 id 为 1 的商品数据,能够看到曾经返回了数据

~ grpcurl -plaintext -d '{"product_id": 1}' 127.0.0.1:8081 product.Product.Product

{
  "productId": "1",
  "name": "夹克 1"
}

再看 redis 中曾经存在了 id 为 1 的这条数据的缓存,这就是框架给咱们主动生成的缓存

127.0.0.1:6379> get cache:product:product:id:1

{\"Id\":1,\"Cateid\":2,\"Name\":\"\xe5\xa4\xb9\xe5\x85\x8b1\",\"Subtitle\":\"\xe5\xa4\xb9\xe5\x85\x8b1\",\"Images\":\"1.jpg,2.jpg,3.jpg\",\"Detail\":\"\xe8\xaf\xa6\xe6\x83\x85\",\"Price\":100,\"Stock\":10,\"Status\":1,\"CreateTime\":\"2022-06-17T17:51:23Z\",\"UpdateTime\":\"2022-06-17T17:51:23Z\"}

咱们再申请 id 为 666 的商品,因为咱们表里没有 id 为 666 的商品,框架会帮咱们缓存一个空值,这个空值的过期工夫为 1 分钟

127.0.0.1:6379> get cache:product:product:id:666
"*"

当咱们删除数据或者更新数据的时候,以 id 为 key 的行记录缓存会被删除

缓存索引

咱们的分类商品列表是须要反对分页的,通过往上滑动能够一直地加载下一页,商品依照创立工夫倒序返回列表,应用游标的形式进行分页。

怎么在缓存中存储分类的商品呢?咱们应用 Sorted Set 来存储,member 为商品的 id,即咱们只在 Sorted Set 中存储缓存索引,查出缓存索引后,因为咱们主动生成了以主键 id 索引为 key 的缓存,所以查出索引列表后咱们再查问行记录缓存即可获取商品的详情,Sorted Set 的 score 为商品的创立工夫。

上面咱们一起来剖析分类商品列表的逻辑该怎么写,首先先从缓存中读取当前页的商品 id 索引,调用 cacheProductList 办法,留神,这里调用查问缓存办法疏忽了 error,为什么要疏忽这个 error 呢,因为咱们冀望的是尽最大可能的给用户返回数据,也就是 redis 挂掉了的话那咱们就会从数据库查问数据返回给用户,而不会因为 redis 挂掉而返回谬误。

pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))

cacheProductList 办法实现如下,通过 ZrevrangebyscoreWithScoresAndLimitCtx 倒序从缓存中读数据,并限度读条数为分页大小

func (l *ProductListLogic) cacheProductList(ctx context.Context, cid int32, cursor, ps int64) ([]int64, error) {pairs, err := l.svcCtx.BizRedis.ZrevrangebyscoreWithScoresAndLimitCtx(ctx, categoryKey(cid), cursor, 0, 0, int(ps))
  if err != nil {return nil, err}
  var ids []int64
  for _, pair := range pairs {id, _ := strconv.ParseInt(pair.Key, 10, 64)
    ids = append(ids, id)
  }
  return ids, nil
}

为了示意列表的完结,咱们会在 Sorted Set 中设置一个完结标志符,该标志符的 member 为 -1,score 为 0,所以咱们在从缓存中查出数据后,须要判断数据的最初一条是否为 -1,如果为 - 1 的话阐明列表曾经加载到最初一页了,用户再滑动屏幕的话前端就不会再持续申请后端的接口了,逻辑如下,从缓存中查出数据后再依据主键 id 查问商品的详情即可

pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))
if len(pids) == int(in.Ps) {
  isCache = true
  if pids[len(pids)-1] == -1 {isEnd = true}
}

如果从缓存中查出的数据为 0 条,那么咱们就从数据库中查问该分类下的数据,这里要留神从数据库查问数据的时候咱们要限度查问的条数,咱们默认一次查问 300 条,因为咱们每页大小为 10,300 条能够让用户下翻 30 页,大多数状况下用户基本不会翻那么多页,所以咱们不会全副加载以升高咱们的缓存资源,当用户真的翻页超过 30 页后,咱们再按需加载到缓存中

func (m *defaultProductModel) CategoryProducts(ctx context.Context, cateid, ctime, limit int64) ([]*Product, error) {var products []*Product
  err := m.QueryRowsNoCacheCtx(ctx, &products, fmt.Sprintf("select %s from %s where cateid=? and status=1 and create_time<? order by create_time desc limit ?", productRows, m.table), cateid, ctime, limit)
  if err != nil {return nil, err}
  return products, nil
}

获取到当前页的数据后,咱们还须要做去重,因为如果咱们只以 createTime 作为游标的话,很可能数据会反复,所以咱们还须要加上 id 作为去重条件,去重逻辑如下

for k, p := range firstPage {
      if p.CreateTime == in.Cursor && p.ProductId == in.ProductId {firstPage = firstPage[k:]
        break
      }
}

最初,如果没有命中缓存的话,咱们须要把从数据库查出的数据写入缓存,这里须要留神的是如果数据曾经到了开端须要加上数据完结的标识符,即 val 为 -1,score 为 0,这里咱们异步的写会缓存,因为写缓存并不是主逻辑,不须要期待实现,写失败也没有影响呢,通过异步形式升高接口耗时,处处都有小优化呢

if !isCache {threading.GoSafe(func() {if len(products) < defaultLimit && len(products) > 0 {endTime, _ := time.Parse("2006-01-02 15:04:05", "0000-00-00 00:00:00")
        products = append(products, &model.Product{Id: -1, CreateTime: endTime})
      }
      _ = l.addCacheProductList(context.Background(), products)
    })
}

能够看出想要写一个残缺的基于游标分页的逻辑还是比较复杂的,有很多细节须要思考,大家平时在写相似代码时肯定要仔细,该办法的整体代码如下:

func (l *ProductListLogic) ProductList(in *product.ProductListRequest) (*product.ProductListResponse, error) {_, err := l.svcCtx.CategoryModel.FindOne(l.ctx, int64(in.CategoryId))
  if err == model.ErrNotFound {return nil, status.Error(codes.NotFound, "category not found")
  }
  if in.Cursor == 0 {in.Cursor = time.Now().Unix()}
  if in.Ps == 0 {in.Ps = defaultPageSize}
  var (
    isCache, isEnd   bool
    lastID, lastTime int64
    firstPage        []*product.ProductItem
    products         []*model.Product)
  pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))
  if len(pids) == int(in.Ps) {
    isCache = true
    if pids[len(pids)-1] == -1 {isEnd = true}
    products, err := l.productsByIds(l.ctx, pids)
    if err != nil {return nil, err}
    for _, p := range products {
      firstPage = append(firstPage, &product.ProductItem{
        ProductId:  p.Id,
        Name:       p.Name,
        CreateTime: p.CreateTime.Unix(),})
    }
  } else {
    var (
      err   error
      ctime = time.Unix(in.Cursor, 0).Format("2006-01-02 15:04:05")
    )
    products, err = l.svcCtx.ProductModel.CategoryProducts(l.ctx, ctime, int64(in.CategoryId), defaultLimit)
    if err != nil {return nil, err}
    var firstPageProducts []*model.Product
    if len(products) > int(in.Ps) {firstPageProducts = products[:int(in.Ps)]
    } else {
      firstPageProducts = products
      isEnd = true
    }
    for _, p := range firstPageProducts {
      firstPage = append(firstPage, &product.ProductItem{
        ProductId:  p.Id,
        Name:       p.Name,
        CreateTime: p.CreateTime.Unix(),})
    }
  }
  if len(firstPage) > 0 {pageLast := firstPage[len(firstPage)-1]
    lastID = pageLast.ProductId
    lastTime = pageLast.CreateTime
    if lastTime < 0 {lastTime = 0}
    for k, p := range firstPage {
      if p.CreateTime == in.Cursor && p.ProductId == in.ProductId {firstPage = firstPage[k:]
        break
      }
    }
  }
  ret := &product.ProductListResponse{
    IsEnd:     isEnd,
    Timestamp: lastTime,
    ProductId: lastID,
    Products:  firstPage,
  }
  if !isCache {threading.GoSafe(func() {if len(products) < defaultLimit && len(products) > 0 {endTime, _ := time.Parse("2006-01-02 15:04:05", "0000-00-00 00:00:00")
        products = append(products, &model.Product{Id: -1, CreateTime: endTime})
      }
      _ = l.addCacheProductList(context.Background(), products)
    })
  }
  return ret, nil
}

咱们通过 grpcurl 工具申请 ProductList 接口后返回数据的同时也写进了缓存索引中,当下次再申请的时候就间接从缓存中读取

grpcurl -plaintext -d '{"category_id": 8}' 127.0.0.1:8081 product.Product.ProductList

缓存击穿

缓存击穿是指拜访某个十分热的数据,缓存不存在,导致大量的申请发送到了数据库,这会导致数据库压力陡增,缓存击穿常常产生在热点数据过期生效时,如下图所示:

既然缓存击穿常常产生在热点数据过期生效的时候,那么咱们不让缓存生效不就好了,每次查问缓存的时候不要应用 Exists 来判断 key 是否存在,而是应用 Expire 给缓存续期,通过 Expire 返回后果判断 key 是否存在,既然是热点数据通过一直地续期也就不会过期了

还有一种简略无效的办法就是通过 singleflight 来管制,singleflight 的原理是当同时有很多申请同时到来时,最终只有一个申请会最终拜访到资源,其余申请都会期待后果而后返回。获取商品详情应用 singleflight 进行爱护示例如下:

func (l *ProductLogic) Product(in *product.ProductItemRequest) (*product.ProductItem, error) {v, err, _ := l.svcCtx.SingleGroup.Do(fmt.Sprintf("product:%d", in.ProductId), func() (interface{}, error) {return l.svcCtx.ProductModel.FindOne(l.ctx, in.ProductId)
  })
  if err != nil {return nil, err}
  p := v.(*model.Product)
  return &product.ProductItem{
    ProductId: p.Id,
    Name:      p.Name,
  }, nil
}

缓存穿透

缓存穿透是指要拜访的数据既不在缓存中,也不在数据库中,导致申请在拜访缓存时,产生缓存缺失,再去拜访数据库时,发现数据库中也没有要拜访的数据。此时也就没方法从数据库中读出数据再写入缓存来服务后续的申请,相似的申请如果多的话就会给缓存和数据库带来微小的压力。

针对缓存穿透问题,解决办法其实很简略,就是缓存一个空值,防止每次都透传到数据库,缓存的工夫能够设置短一点,比方 1 分钟,其实上文曾经有提到了,当咱们拜访不存在的数据的时候,go-zero 框架会帮咱们主动加上空缓存,比方咱们拜访 id 为 999 的商品,该商品在数据库中是不存在的。

grpcurl -plaintext -d '{"product_id": 999}' 127.0.0.1:8081 product.Product.Product

此时查看缓存,曾经帮我增加好了空缓存

127.0.0.1:6379> get cache:product:product:id:999
"*"

缓存雪崩

缓存雪崩时指大量的的利用申请无奈在 Redis 缓存中进行解决,紧接着利用将大量的申请发送到数据库,导致数据库被打挂,好惨呐!!缓存雪崩个别是由两个起因导致的,应答计划也不太一样。

第一个起因是:缓存中有大量的数据同时过期,导致大量的申请无奈失去失常解决。

针对大量数据同时生效带来的缓存雪崩问题,个别的解决方案是要防止大量的数据设置雷同的过期工夫,如果业务上确实有要求数据要同时生效,那么能够在过期工夫上加一个较小的随机数,这样不同的数据过期工夫不同,但差异也不大,防止大量数据同时过期,也根本能满足业务的需要。

第二个起因是:Redis 呈现了宕机,没方法失常响应申请了,这就会导致大量申请间接打到数据库,从而产生雪崩

针对这类起因个别咱们须要让咱们的数据库反对熔断,让数据库压力比拟大的时候就触发熔断,抛弃掉局部申请,当然熔断是对业务有损的。

在 go-zero 的数据库客户端是反对熔断的,如下在 ExecCtx 办法中应用熔断进行爱护

func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) {ctx, span := startSpan(ctx, "Exec")
  defer func() {endSpan(span, err)
  }()

  err = db.brk.DoWithAcceptable(func() error {
    var conn *sql.DB
    conn, err = db.connProv()
    if err != nil {db.onError(err)
      return err
    }

    result, err = exec(ctx, conn, q, args...)
    return err
  }, db.acceptable)

  return
}

结束语

本篇文章先介绍了 go-zero 中缓存应用的根本姿态,接着具体介绍了使游标通过缓存索引来实现分页性能,紧接着介绍了缓存击穿、缓存穿透、缓存雪崩的概念和应答计划。缓存对于高并发零碎来说是重中之重,然而缓存的应用坑还是挺多的,大家在平时我的项目开发中肯定要十分认真,如果使用不当的话岂但不能带来性能的晋升,反而会让业务代码变得复杂。

在这里要非常感谢 go-zero 社区中的 @group 和 @寻找,最美的心灵两位同学,他们踊跃地参加到该项目标开发中,并提了许多改良意见。

心愿本篇文章对你有所帮忙,谢谢。

每周一、周四更新

代码仓库: https://github.com/zhoushuguang/lebron

我的项目地址

https://github.com/zeromicro/go-zero

欢送应用 go-zerostar 反对咱们!

微信交换群

关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。

退出移动版