关于golang:通过MapReduce降低服务响应时间

238次阅读

共计 3773 个字符,预计需要花费 10 分钟才能阅读完成。

在微服务中开发中,api 网关表演对外提供 restful api 的角色,而 api 的数据往往会依赖其余服务,简单的 api 更是会依赖多个甚至数十个服务。尽管单个被依赖服务的耗时个别都比拟低,但如果多个服务串行依赖的话那么整个 api 的耗时将会大大增加。

那么通过什么伎俩来优化呢?咱们首先想到的是通过并发来的形式来解决依赖,这样就能升高整个依赖的耗时,Go 根底库中为咱们提供了 WaitGroup 工具用来进行并发管制,但理论业务场景中多个依赖如果有一个出错咱们冀望能立刻返回而不是等所有依赖都执行完再返回后果,而且 WaitGroup 中对变量的赋值往往须要加锁,每个依赖函数都须要增加 Add 和 Done 对于老手来说比拟容易出错

基于以上的背景,go-zero 框架中为咱们提供了并发解决工具 MapReduce,该工具开箱即用,不须要做什么初始化,咱们通过下图看下应用 MapReduce 和没应用的耗时比照:

雷同的依赖,串行解决的话须要 200ms,应用 MapReduce 后的耗时等于所有依赖中最大的耗时为 100ms,可见 MapReduce 能够大大降低服务耗时,而且随着依赖的减少成果就会越显著,缩小解决耗时的同时并不会减少服务器压力

并发解决工具 MapReduce

MapReduce 是 Google 提出的一个软件架构,用于大规模数据集的并行运算,go-zero 中的 MapReduce 工具正是借鉴了这种架构思维

go-zero 框架中的 MapReduce 工具次要用来对批量数据进行并发的解决,以此来晋升服务的性能

咱们通过几个示例来演示 MapReduce 的用法

MapReduce 次要有三个参数,第一个参数为 generate 用以生产数据,第二个参数为 mapper 用以对数据进行解决,第三个参数为 reducer 用以对 mapper 后的数据做聚合返回,还能够通过 opts 选项设置并发解决的线程数量

场景一: 某些性能的后果往往须要依赖多个服务,比方商品详情的后果往往会依赖用户服务、库存服务、订单服务等等,个别被依赖的服务都是以 rpc 的模式对外提供,为了升高依赖的耗时咱们往往须要对依赖做并行处理

func productDetail(uid, pid int64) (*ProductDetail, error) {
    var pd ProductDetail
    err := mr.Finish(func() (err error) {pd.User, err = userRpc.User(uid)
        return
    }, func() (err error) {pd.Store, err = storeRpc.Store(pid)
        return
    }, func() (err error) {pd.Order, err = orderRpc.Order(pid)
        return
    })

    if err != nil {log.Printf("product detail error: %v", err)
        return nil, err
    }

    return &pd, nil
}

该示例中返回商品详情依赖了多个服务获取数据,因而做并发的依赖解决,对接口的性能有很大的晋升

场景二: 很多时候咱们须要对一批数据进行解决,比方对一批用户 id,效验每个用户的合法性并且效验过程中有一个出错就认为效验失败,返回的后果为效验非法的用户 id

func checkLegal(uids []int64) ([]int64, error) {r, err := mr.MapReduce(func(source chan<- interface{}) {
        for _, uid := range uids {source <- uid}
    }, func(item interface{}, writer mr.Writer, cancel func(error)) {uid := item.(int64)
        ok, err := check(uid)
        if err != nil {cancel(err)
        }
        if ok {writer.Write(uid)
        }
    }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {var uids []int64
        for p := range pipe {uids = append(uids, p.(int64))
        }
        writer.Write(uids)
    })
    if err != nil {log.Printf("check error: %v", err)
        return nil, err
    }

    return r.([]int64), nil
}

func check(uid int64) (bool, error) {
    // do something check user legal
    return true, nil
}

该示例中,如果 check 过程呈现谬误则通过 cancel 办法完结效验过程,并返回 error 整个效验过程完结,如果某个 uid 效验后果为 false 则最终后果不返回该 uid

MapReduce 应用注意事项

  • mapper 和 reducer 中都能够调用 cancel,参数为 error,调用后立刻返回,返回后果为 nil, error
  • mapper 中如果不调用 writer.Write 则 item 最终不会被 reducer 聚合
  • reducer 中如果不调用 writer.Wirte 则返回后果为 nil, ErrReduceNoOutput
  • reducer 为单线程,所有 mapper 进去的后果在这里串行聚合

实现原理剖析:

MapReduce 中首先通过 buildSource 办法通过执行 generate(参数为无缓冲 channel) 产生数据,并返回无缓冲的 channel,mapper 会从该 channel 中读取数据

func buildSource(generate GenerateFunc) chan interface{} {source := make(chan interface{})
    go func() {defer close(source)
        generate(source)
    }()

    return source
}

在 MapReduceWithSource 办法中定义了 cancel 办法,mapper 和 reducer 中都能够调用该办法,调用后主线程收到 close 信号会立马返回

cancel := once(func(err error) {
    if err != nil {retErr.Set(err)
    } else {
        // 默认的 error
        retErr.Set(ErrCancelWithNil)
    }

    drain(source)
    // 调用 close(ouput) 主线程收到 Done 信号,立马返回
    finish()})

在 mapperDispatcher 办法中调用了 executeMappers,executeMappers 生产 buildSource 产生的数据,每一个 item 都会起一个 goroutine 独自解决,默认最大并发数为 16,能够通过 WithWorkers 进行设置

var wg sync.WaitGroup
defer func() {wg.Wait() // 保障所有的 item 都解决实现
    close(collector)
}()

pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // 将 mapper 解决完的数据写入 collector
for {
    select {
    case <-done: // 当调用了 cancel 会触发立刻返回
        return
    case pool <- lang.Placeholder: // 管制最大并发数
        item, ok := <-input
        if !ok {
            <-pool
            return
        }

        wg.Add(1)
        go func() {defer func() {wg.Done()
                <-pool
            }()

            mapper(item, writer) // 对 item 进行解决,解决完调用 writer.Write 把后果写入 collector 对应的 channel 中
        }()}
}

reducer 单 goroutine 对数 mapper 写入 collector 的数据进行解决,如果 reducer 中没有手动调用 writer.Write 则最终会执行 finish 办法对 output 进行 close 防止死锁

go func() {defer func() {if r := recover(); r != nil {cancel(fmt.Errorf("%v", r))
        } else {finish()
        }
    }()
    reducer(collector, writer, cancel)
}()

在该工具包中还提供了许多针对不同业务场景的办法,实现原理与 MapReduce 大同小异,感兴趣的同学能够查看源码学习

  • MapReduceVoid 性能和 MapReduce 相似但没有后果返回只返回 error
  • Finish 解决固定数量的依赖,返回 error,有一个 error 立刻返回
  • FinishVoid 和 Finish 办法性能相似,没有返回值
  • Map 只做 generate 和 mapper 解决,返回 channel
  • MapVoid 和 Map 性能相似,无返回

本文次要介绍了 go-zero 框架中的 MapReduce 工具,在理论的我的项目中十分实用。用好工具对于晋升服务性能和开发效率都有很大的帮忙,心愿本篇文章能给大家带来一些播种。

我的项目地址

https://github.com/tal-tech/go-zero

好将来技术

正文完
 0