缓存是高并发服务的根底,毫不夸大的说没有缓存高并发服务就无从谈起。本我的项目缓存应用Redis,Redis是目前支流的缓存数据库,反对丰盛的数据类型,其中汇合类型的底层次要依赖:整数数组、双向链表、哈希表、压缩列表和跳表五种数据结构。因为底层依赖的数据结构的高效性以及基于多路复用的高性能I/O模型,所以Redis也提供了十分强悍的性能。下图展现了Redis数据类型对应的底层数据结构。

根本应用

在go-zero中默认集成了缓存model数据的性能,咱们在应用goctl主动生成model代码的时候加上 -c 参数即可生成集成缓存的model代码

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:3306)/product" -table="*"  -dir="./model" -c

通过简略的配置咱们就能够应用model层的缓存啦,model层缓存默认过期工夫为7天,如果没有查到数据会设置一个空缓存,空缓存的过期工夫为1分钟,model层cache配置和初始化如下:

CacheRedis:  - Host: 127.0.0.1:6379    Type: node
CategoryModel: model.NewCategoryModel(conn, c.CacheRedis)

这次演示的代码次要会基于product-rpc服务,为了简略咱们间接应用grpcurl来进行调试,留神启动的时候次要注册反射服务,通过goctl主动生成的rpc服务在dev或test环境下曾经帮咱们注册好了,咱们须要把咱们的mode设置为dev,默认的mode为pro,如下代码所示:

s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {    product.RegisterProductServer(grpcServer, svr)    if c.Mode == service.DevMode || c.Mode == service.TestMode {      reflection.Register(grpcServer)    }})

间接应用go install装置grpcurl工具,so easy !!!妈妈再也不必放心我不会调试gRPC了

go install github.com/fullstorydev/grpcurl/cmd/grpcurl

启动服务,通过如下命令查问服务,服务提供的办法,能够看到以后提供了Product获取商品详情接口和Products批量获取商品详情接口

~ grpcurl -plaintext 127.0.0.1:8081 listgrpc.health.v1.Healthgrpc.reflection.v1alpha.ServerReflectionproduct.Product~ grpcurl -plaintext 127.0.0.1:8081 list product.Productproduct.Product.Productproduct.Product.Products

咱们先往product表里插入一些测试数据,测试数据放在lebron/sql/data.sql文件中,此时咱们查看id为1的商品数据,这时候缓存中是没有id为1这条数据的

127.0.0.1:6379> EXISTS cache:product:product:id:1(integer) 0

通过grpcurl工具来调用Product接口查问id为1的商品数据,能够看到曾经返回了数据

~ grpcurl -plaintext -d '{"product_id": 1}' 127.0.0.1:8081 product.Product.Product{  "productId": "1",  "name": "夹克1"}

再看redis中曾经存在了id为1的这条数据的缓存,这就是框架给咱们主动生成的缓存

127.0.0.1:6379> get cache:product:product:id:1{\"Id\":1,\"Cateid\":2,\"Name\":\"\xe5\xa4\xb9\xe5\x85\x8b1\",\"Subtitle\":\"\xe5\xa4\xb9\xe5\x85\x8b1\",\"Images\":\"1.jpg,2.jpg,3.jpg\",\"Detail\":\"\xe8\xaf\xa6\xe6\x83\x85\",\"Price\":100,\"Stock\":10,\"Status\":1,\"CreateTime\":\"2022-06-17T17:51:23Z\",\"UpdateTime\":\"2022-06-17T17:51:23Z\"}

咱们再申请id为666的商品,因为咱们表里没有id为666的商品,框架会帮咱们缓存一个空值,这个空值的过期工夫为1分钟

127.0.0.1:6379> get cache:product:product:id:666"*"

当咱们删除数据或者更新数据的时候,以id为key的行记录缓存会被删除

缓存索引

咱们的分类商品列表是须要反对分页的,通过往上滑动能够一直地加载下一页,商品依照创立工夫倒序返回列表,应用游标的形式进行分页。

怎么在缓存中存储分类的商品呢?咱们应用Sorted Set来存储,member为商品的id,即咱们只在Sorted Set中存储缓存索引,查出缓存索引后,因为咱们主动生成了以主键id索引为key的缓存,所以查出索引列表后咱们再查问行记录缓存即可获取商品的详情,Sorted Set的score为商品的创立工夫。

上面咱们一起来剖析分类商品列表的逻辑该怎么写,首先先从缓存中读取当前页的商品id索引,调用cacheProductList办法,留神,这里调用查问缓存办法疏忽了error,为什么要疏忽这个error呢,因为咱们冀望的是尽最大可能的给用户返回数据,也就是redis挂掉了的话那咱们就会从数据库查问数据返回给用户,而不会因为redis挂掉而返回谬误。

pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))

cacheProductList办法实现如下,通过ZrevrangebyscoreWithScoresAndLimitCtx倒序从缓存中读数据,并限度读条数为分页大小

func (l *ProductListLogic) cacheProductList(ctx context.Context, cid int32, cursor, ps int64) ([]int64, error) {  pairs, err := l.svcCtx.BizRedis.ZrevrangebyscoreWithScoresAndLimitCtx(ctx, categoryKey(cid), cursor, 0, 0, int(ps))  if err != nil {    return nil, err  }  var ids []int64  for _, pair := range pairs {    id, _ := strconv.ParseInt(pair.Key, 10, 64)    ids = append(ids, id)  }  return ids, nil}

为了示意列表的完结,咱们会在Sorted Set中设置一个完结标志符,该标志符的member为-1,score为0,所以咱们在从缓存中查出数据后,须要判断数据的最初一条是否为-1,如果为-1的话阐明列表曾经加载到最初一页了,用户再滑动屏幕的话前端就不会再持续申请后端的接口了,逻辑如下,从缓存中查出数据后再依据主键id查问商品的详情即可

pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))if len(pids) == int(in.Ps) {  isCache = true  if pids[len(pids)-1] == -1 {    isEnd = true  }}

如果从缓存中查出的数据为0条,那么咱们就从数据库中查问该分类下的数据,这里要留神从数据库查问数据的时候咱们要限度查问的条数,咱们默认一次查问300条,因为咱们每页大小为10,300条能够让用户下翻30页,大多数状况下用户基本不会翻那么多页,所以咱们不会全副加载以升高咱们的缓存资源,当用户真的翻页超过30页后,咱们再按需加载到缓存中

func (m *defaultProductModel) CategoryProducts(ctx context.Context, cateid, ctime, limit int64) ([]*Product, error) {  var products []*Product  err := m.QueryRowsNoCacheCtx(ctx, &products, fmt.Sprintf("select %s from %s where cateid=? and status=1 and create_time<? order by create_time desc limit ?", productRows, m.table), cateid, ctime, limit)  if err != nil {    return nil, err  }  return products, nil}

获取到当前页的数据后,咱们还须要做去重,因为如果咱们只以createTime作为游标的话,很可能数据会反复,所以咱们还须要加上id作为去重条件,去重逻辑如下

for k, p := range firstPage {      if p.CreateTime == in.Cursor && p.ProductId == in.ProductId {        firstPage = firstPage[k:]        break      }}

最初,如果没有命中缓存的话,咱们须要把从数据库查出的数据写入缓存,这里须要留神的是如果数据曾经到了开端须要加上数据完结的标识符,即val为-1,score为0,这里咱们异步的写会缓存,因为写缓存并不是主逻辑,不须要期待实现,写失败也没有影响呢,通过异步形式升高接口耗时,处处都有小优化呢

if !isCache {    threading.GoSafe(func() {      if len(products) < defaultLimit && len(products) > 0 {        endTime, _ := time.Parse("2006-01-02 15:04:05", "0000-00-00 00:00:00")        products = append(products, &model.Product{Id: -1, CreateTime: endTime})      }      _ = l.addCacheProductList(context.Background(), products)    })}

能够看出想要写一个残缺的基于游标分页的逻辑还是比较复杂的,有很多细节须要思考,大家平时在写相似代码时肯定要仔细,该办法的整体代码如下:

func (l *ProductListLogic) ProductList(in *product.ProductListRequest) (*product.ProductListResponse, error) {  _, err := l.svcCtx.CategoryModel.FindOne(l.ctx, int64(in.CategoryId))  if err == model.ErrNotFound {    return nil, status.Error(codes.NotFound, "category not found")  }  if in.Cursor == 0 {    in.Cursor = time.Now().Unix()  }  if in.Ps == 0 {    in.Ps = defaultPageSize  }  var (    isCache, isEnd   bool    lastID, lastTime int64    firstPage        []*product.ProductItem    products         []*model.Product  )  pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))  if len(pids) == int(in.Ps) {    isCache = true    if pids[len(pids)-1] == -1 {      isEnd = true    }    products, err := l.productsByIds(l.ctx, pids)    if err != nil {      return nil, err    }    for _, p := range products {      firstPage = append(firstPage, &product.ProductItem{        ProductId:  p.Id,        Name:       p.Name,        CreateTime: p.CreateTime.Unix(),      })    }  } else {    var (      err   error      ctime = time.Unix(in.Cursor, 0).Format("2006-01-02 15:04:05")    )    products, err = l.svcCtx.ProductModel.CategoryProducts(l.ctx, ctime, int64(in.CategoryId), defaultLimit)    if err != nil {      return nil, err    }    var firstPageProducts []*model.Product    if len(products) > int(in.Ps) {      firstPageProducts = products[:int(in.Ps)]    } else {      firstPageProducts = products      isEnd = true    }    for _, p := range firstPageProducts {      firstPage = append(firstPage, &product.ProductItem{        ProductId:  p.Id,        Name:       p.Name,        CreateTime: p.CreateTime.Unix(),      })    }  }  if len(firstPage) > 0 {    pageLast := firstPage[len(firstPage)-1]    lastID = pageLast.ProductId    lastTime = pageLast.CreateTime    if lastTime < 0 {      lastTime = 0    }    for k, p := range firstPage {      if p.CreateTime == in.Cursor && p.ProductId == in.ProductId {        firstPage = firstPage[k:]        break      }    }  }  ret := &product.ProductListResponse{    IsEnd:     isEnd,    Timestamp: lastTime,    ProductId: lastID,    Products:  firstPage,  }  if !isCache {    threading.GoSafe(func() {      if len(products) < defaultLimit && len(products) > 0 {        endTime, _ := time.Parse("2006-01-02 15:04:05", "0000-00-00 00:00:00")        products = append(products, &model.Product{Id: -1, CreateTime: endTime})      }      _ = l.addCacheProductList(context.Background(), products)    })  }  return ret, nil}

咱们通过grpcurl工具申请ProductList接口后返回数据的同时也写进了缓存索引中,当下次再申请的时候就间接从缓存中读取

grpcurl -plaintext -d '{"category_id": 8}' 127.0.0.1:8081 product.Product.ProductList

缓存击穿

缓存击穿是指拜访某个十分热的数据,缓存不存在,导致大量的申请发送到了数据库,这会导致数据库压力陡增,缓存击穿常常产生在热点数据过期生效时,如下图所示:

既然缓存击穿常常产生在热点数据过期生效的时候,那么咱们不让缓存生效不就好了,每次查问缓存的时候不要应用Exists来判断key是否存在,而是应用Expire给缓存续期,通过Expire返回后果判断key是否存在,既然是热点数据通过一直地续期也就不会过期了

还有一种简略无效的办法就是通过singleflight来管制,singleflight的原理是当同时有很多申请同时到来时,最终只有一个申请会最终拜访到资源,其余申请都会期待后果而后返回。获取商品详情应用singleflight进行爱护示例如下:

func (l *ProductLogic) Product(in *product.ProductItemRequest) (*product.ProductItem, error) {  v, err, _ := l.svcCtx.SingleGroup.Do(fmt.Sprintf("product:%d", in.ProductId), func() (interface{}, error) {    return l.svcCtx.ProductModel.FindOne(l.ctx, in.ProductId)  })  if err != nil {    return nil, err  }  p := v.(*model.Product)  return &product.ProductItem{    ProductId: p.Id,    Name:      p.Name,  }, nil}

缓存穿透

缓存穿透是指要拜访的数据既不在缓存中,也不在数据库中,导致申请在拜访缓存时,产生缓存缺失,再去拜访数据库时,发现数据库中也没有要拜访的数据。此时也就没方法从数据库中读出数据再写入缓存来服务后续的申请,相似的申请如果多的话就会给缓存和数据库带来微小的压力。

针对缓存穿透问题,解决办法其实很简略,就是缓存一个空值,防止每次都透传到数据库,缓存的工夫能够设置短一点,比方1分钟,其实上文曾经有提到了,当咱们拜访不存在的数据的时候,go-zero框架会帮咱们主动加上空缓存,比方咱们拜访id为999的商品,该商品在数据库中是不存在的。

grpcurl -plaintext -d '{"product_id": 999}' 127.0.0.1:8081 product.Product.Product

此时查看缓存,曾经帮我增加好了空缓存

127.0.0.1:6379> get cache:product:product:id:999"*"

缓存雪崩

缓存雪崩时指大量的的利用申请无奈在Redis缓存中进行解决,紧接着利用将大量的申请发送到数据库,导致数据库被打挂,好惨呐!!缓存雪崩个别是由两个起因导致的,应答计划也不太一样。

第一个起因是:缓存中有大量的数据同时过期,导致大量的申请无奈失去失常解决。

针对大量数据同时生效带来的缓存雪崩问题,个别的解决方案是要防止大量的数据设置雷同的过期工夫,如果业务上确实有要求数据要同时生效,那么能够在过期工夫上加一个较小的随机数,这样不同的数据过期工夫不同,但差异也不大,防止大量数据同时过期,也根本能满足业务的需要。

第二个起因是:Redis呈现了宕机,没方法失常响应申请了,这就会导致大量申请间接打到数据库,从而产生雪崩

针对这类起因个别咱们须要让咱们的数据库反对熔断,让数据库压力比拟大的时候就触发熔断,抛弃掉局部申请,当然熔断是对业务有损的。

在go-zero的数据库客户端是反对熔断的,如下在ExecCtx办法中应用熔断进行爱护

func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...interface{}) (  result sql.Result, err error) {  ctx, span := startSpan(ctx, "Exec")  defer func() {    endSpan(span, err)  }()  err = db.brk.DoWithAcceptable(func() error {    var conn *sql.DB    conn, err = db.connProv()    if err != nil {      db.onError(err)      return err    }    result, err = exec(ctx, conn, q, args...)    return err  }, db.acceptable)  return}

结束语

本篇文章先介绍了go-zero中缓存应用的根本姿态,接着具体介绍了使游标通过缓存索引来实现分页性能,紧接着介绍了缓存击穿、缓存穿透、缓存雪崩的概念和应答计划。缓存对于高并发零碎来说是重中之重,然而缓存的应用坑还是挺多的,大家在平时我的项目开发中肯定要十分认真,如果使用不当的话岂但不能带来性能的晋升,反而会让业务代码变得复杂。

在这里要非常感谢go-zero社区中的@group和@寻找,最美的心灵两位同学,他们踊跃地参加到该项目标开发中,并提了许多改良意见。

心愿本篇文章对你有所帮忙,谢谢。

每周一、周四更新

代码仓库: https://github.com/zhoushuguang/lebron

我的项目地址

https://github.com/zeromicro/go-zero

欢送应用 go-zerostar 反对咱们!

微信交换群

关注『微服务实际』公众号并点击 交换群 获取社区群二维码。