在微服务中开发中,api网关表演对外提供restful api的角色,而api的数据往往会依赖其余服务,简单的api更是会依赖多个甚至数十个服务。尽管单个被依赖服务的耗时个别都比拟低,但如果多个服务串行依赖的话那么整个api的耗时将会大大增加。

那么通过什么伎俩来优化呢?咱们首先想到的是通过并发来的形式来解决依赖,这样就能升高整个依赖的耗时,Go根底库中为咱们提供了 WaitGroup 工具用来进行并发管制,但理论业务场景中多个依赖如果有一个出错咱们冀望能立刻返回而不是等所有依赖都执行完再返回后果,而且WaitGroup中对变量的赋值往往须要加锁,每个依赖函数都须要增加Add和Done对于老手来说比拟容易出错

基于以上的背景,go-zero框架中为咱们提供了并发解决工具MapReduce,该工具开箱即用,不须要做什么初始化,咱们通过下图看下应用MapReduce和没应用的耗时比照:

雷同的依赖,串行解决的话须要200ms,应用MapReduce后的耗时等于所有依赖中最大的耗时为100ms,可见MapReduce能够大大降低服务耗时,而且随着依赖的减少成果就会越显著,缩小解决耗时的同时并不会减少服务器压力

并发解决工具MapReduce

MapReduce是Google提出的一个软件架构,用于大规模数据集的并行运算,go-zero中的MapReduce工具正是借鉴了这种架构思维

go-zero框架中的MapReduce工具次要用来对批量数据进行并发的解决,以此来晋升服务的性能

咱们通过几个示例来演示MapReduce的用法

MapReduce次要有三个参数,第一个参数为generate用以生产数据,第二个参数为mapper用以对数据进行解决,第三个参数为reducer用以对mapper后的数据做聚合返回,还能够通过opts选项设置并发解决的线程数量

场景一: 某些性能的后果往往须要依赖多个服务,比方商品详情的后果往往会依赖用户服务、库存服务、订单服务等等,个别被依赖的服务都是以rpc的模式对外提供,为了升高依赖的耗时咱们往往须要对依赖做并行处理

func productDetail(uid, pid int64) (*ProductDetail, error) {    var pd ProductDetail    err := mr.Finish(func() (err error) {        pd.User, err = userRpc.User(uid)        return    }, func() (err error) {        pd.Store, err = storeRpc.Store(pid)        return    }, func() (err error) {        pd.Order, err = orderRpc.Order(pid)        return    })    if err != nil {        log.Printf("product detail error: %v", err)        return nil, err    }    return &pd, nil}

该示例中返回商品详情依赖了多个服务获取数据,因而做并发的依赖解决,对接口的性能有很大的晋升

场景二: 很多时候咱们须要对一批数据进行解决,比方对一批用户id,效验每个用户的合法性并且效验过程中有一个出错就认为效验失败,返回的后果为效验非法的用户id

func checkLegal(uids []int64) ([]int64, error) {    r, err := mr.MapReduce(func(source chan<- interface{}) {        for _, uid := range uids {            source <- uid        }    }, func(item interface{}, writer mr.Writer, cancel func(error)) {        uid := item.(int64)        ok, err := check(uid)        if err != nil {            cancel(err)        }        if ok {            writer.Write(uid)        }    }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {        var uids []int64        for p := range pipe {            uids = append(uids, p.(int64))        }        writer.Write(uids)    })    if err != nil {        log.Printf("check error: %v", err)        return nil, err    }    return r.([]int64), nil}func check(uid int64) (bool, error) {    // do something check user legal    return true, nil}

该示例中,如果check过程呈现谬误则通过cancel办法完结效验过程,并返回error整个效验过程完结,如果某个uid效验后果为false则最终后果不返回该uid

MapReduce应用注意事项

  • mapper和reducer中都能够调用cancel,参数为error,调用后立刻返回,返回后果为nil, error
  • mapper中如果不调用writer.Write则item最终不会被reducer聚合
  • reducer中如果不调用writer.Wirte则返回后果为nil, ErrReduceNoOutput
  • reducer为单线程,所有mapper进去的后果在这里串行聚合

实现原理剖析:

MapReduce中首先通过buildSource办法通过执行generate(参数为无缓冲channel)产生数据,并返回无缓冲的channel,mapper会从该channel中读取数据

func buildSource(generate GenerateFunc) chan interface{} {    source := make(chan interface{})    go func() {        defer close(source)        generate(source)    }()    return source}

在MapReduceWithSource办法中定义了cancel办法,mapper和reducer中都能够调用该办法,调用后主线程收到close信号会立马返回

cancel := once(func(err error) {    if err != nil {        retErr.Set(err)    } else {        // 默认的error        retErr.Set(ErrCancelWithNil)    }    drain(source)    // 调用close(ouput)主线程收到Done信号,立马返回    finish()})

在mapperDispatcher办法中调用了executeMappers,executeMappers生产buildSource产生的数据,每一个item都会起一个goroutine独自解决,默认最大并发数为16,能够通过WithWorkers进行设置

var wg sync.WaitGroupdefer func() {    wg.Wait() // 保障所有的item都解决实现    close(collector)}()pool := make(chan lang.PlaceholderType, workers)writer := newGuardedWriter(collector, done) // 将mapper解决完的数据写入collectorfor {    select {    case <-done: // 当调用了cancel会触发立刻返回        return    case pool <- lang.Placeholder: // 管制最大并发数        item, ok := <-input        if !ok {            <-pool            return        }        wg.Add(1)        go func() {            defer func() {                wg.Done()                <-pool            }()            mapper(item, writer) // 对item进行解决,解决完调用writer.Write把后果写入collector对应的channel中        }()    }}

reducer单goroutine对数mapper写入collector的数据进行解决,如果reducer中没有手动调用writer.Write则最终会执行finish办法对output进行close防止死锁

go func() {    defer func() {        if r := recover(); r != nil {            cancel(fmt.Errorf("%v", r))        } else {            finish()        }    }()    reducer(collector, writer, cancel)}()

在该工具包中还提供了许多针对不同业务场景的办法,实现原理与MapReduce大同小异,感兴趣的同学能够查看源码学习

  • MapReduceVoid 性能和MapReduce相似但没有后果返回只返回error
  • Finish 解决固定数量的依赖,返回error,有一个error立刻返回
  • FinishVoid 和Finish办法性能相似,没有返回值
  • Map 只做generate和mapper解决,返回channel
  • MapVoid 和Map性能相似,无返回

本文次要介绍了go-zero框架中的MapReduce工具,在理论的我的项目中十分实用。用好工具对于晋升服务性能和开发效率都有很大的帮忙,心愿本篇文章能给大家带来一些播种。

我的项目地址

https://github.com/tal-tech/go-zero

好将来技术