业务场景

在做工作开发的时候,你们肯定会碰到以下场景:

场景1:调用第三方接口的时候, 一个需要你须要调用不同的接口,做数据组装。
场景2:一个利用首页可能依靠于很多服务。那就波及到在加载页面时须要同时申请多个服务的接口。这一步往往是由后端对立调用组装数据再返回给前端,也就是所谓的 BFF(Backend For Frontend) 层。

针对以上两种场景,假如在没有强依赖关系下,抉择串行调用,那么总耗时即:

time=s1+s2+....sn

依照当代秒入百万的有为青年,这么长时间早就把你祖宗十八代问候了一遍。

为了平凡的KPI,咱们往往会抉择并发地调用这些依赖接口。那么总耗时就是:

time=max(s1,s2,s3.....,sn)

当然开始堆业务的时候能够先串行化,等到下面的人焦急的时候,亮出绝招。

这样,年底 PPT 就能够加上浓厚的一笔流水账:为业务某个接口进步百分之XXX性能,间接产生XXX价值。

当然这所有的前提是,做老板不懂技术,做技术”懂”你。

言归正传,如果批改成并发调用,你可能会这么写,

package mainimport (    "fmt"    "sync"    "time")func main() {    var wg sync.WaitGroup    wg.Add(2)    var userInfo *User    var productList []Product        go func() {        defer wg.Done()        userInfo, _ = getUser()    }()    go func() {        defer wg.Done()        productList, _ = getProductList()    }()    wg.Wait()    fmt.Printf("用户信息:%+v\n", userInfo)    fmt.Printf("商品信息:%+v\n", productList)}/********用户服务**********/type User struct {    Name string    Age  uint8}func getUser() (*User, error) {    time.Sleep(500 * time.Millisecond)    var u User    u.Name = "wuqinqiang"    u.Age = 18    return &u, nil}/********商品服务**********/type Product struct {    Title string    Price uint32}func getProductList() ([]Product, error) {    time.Sleep(400 * time.Millisecond)    var list []Product    list = append(list, Product{        Title: "SHib",        Price: 10,    })    return list, nil}

先不论其余问题。从实现上来说,须要多少服务,你会开多少个 G,利用 sync.WaitGroup 的个性,
实现并发编排工作的成果。

如同,问题不大。

然而随着代号 996 业务场景的减少,你会发现,好多模块都有类似的性能,只是对应的业务场景不同而已。

那么咱们能不能抽像出一套针对此业务场景的工具,而把具体业务实现交给业务方。

安顿。

应用

本着不反复造轮子的准则,去搜了下开源我的项目,最终看上了 go-zero 外面的一个工具 mapreduce
从文件名咱们能看进去是什么了,能够自行 Google 这个名词。

应用很简略。咱们通过它革新一下下面的代码:

package mainimport (    "fmt"    "github.com/tal-tech/go-zero/core/mr"    "time")func main() {    var userInfo *User    var productList []Product    _ = mr.Finish(func() (err error) {        userInfo, err = getUser()        return err    }, func() (err error) {        productList, err = getProductList()        return err    })    fmt.Printf("用户信息:%+v\n", userInfo)    fmt.Printf("商品信息:%+v\n", productList)}
用户信息:&{Name:wuqinqiang Age:18}商品信息:[{Title:SHib Price:10}]

是不是难受多了。

然而这里还须要留神一点,假如你调用的其中一个服务谬误,并且你 return err 对应的谬误,那么其余调用的服务会被勾销。
比方咱们批改 getProductList 间接响应谬误。

func getProductList() ([]Product, error) {    return nil, errors.New("test error")}//打印用户信息:<nil>商品信息:[]

那么最终打印的时候连用户信息都会为空,因为呈现一个服务谬误,用户服务申请被勾销了。

个别状况下,在申请服务谬误的时候咱们会有保底操作,一个服务谬误不能影响其余申请的后果。
所以在应用的时候具体解决取决于业务场景。

源码

既然用了,那么就追下源码吧。

func Finish(fns ...func() error) error {    if len(fns) == 0 {        return nil    }    return MapReduceVoid(func(source chan<- interface{}) {        for _, fn := range fns {            source <- fn        }    }, func(item interface{}, writer Writer, cancel func(error)) {        fn := item.(func() error)        if err := fn(); err != nil {            cancel(err)        }    }, func(pipe <-chan interface{}, cancel func(error)) {        drain(pipe)    }, WithWorkers(len(fns)))}
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {    _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {        reducer(input, cancel)        drain(input)        // We need to write a placeholder to let MapReduce to continue on reducer done,        // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.        writer.Write(lang.Placeholder)    }, opts...)    return err}

对于 MapReduceVoid函数,次要查看三个闭包参数。

  • 第一个 GenerateFunc 用于生产数据。
  • MapperFunc 读取生产出的数据,进行解决。
  • VoidReducerFunc 这里示意不对 mapper 后的数据做聚合返回。所以这个闭包在此操作简直0作用。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {    source := buildSource(generate)     return MapReduceWithSource(source, mapper, reducer, opts...)}func buildSource(generate GenerateFunc) chan interface{} {    source := make(chan interface{})// 创立无缓冲通道    threading.GoSafe(func() {        defer close(source)        generate(source) //开始生产数据    })    return source //返回无缓冲通道}

buildSource函数中,返回一个无缓冲的通道。并开启一个 G 运行 generate(source),往无缓冲通道塞数据。 这个generate(source) 不就是一开始 Finish 传递的第一个闭包参数。

return MapReduceVoid(func(source chan<- interface{}) {    // 就这个        for _, fn := range fns {            source <- fn        }    })

而后查看 MapReduceWithSource 函数,

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,    opts ...Option) (interface{}, error) {    options := buildOptions(opts...)    //工作执行完结告诉信号    output := make(chan interface{})    //将mapper解决完的数据写入collector    collector := make(chan interface{}, options.workers)    // 勾销操作信号    done := syncx.NewDoneChan()    writer := newGuardedWriter(output, done.Done())    var closeOnce sync.Once    var retErr errorx.AtomicError    finish := func() {        closeOnce.Do(func() {            done.Close()            close(output)        })    }    cancel := once(func(err error) {        if err != nil {            retErr.Set(err)        } else {            retErr.Set(ErrCancelWithNil)        }        drain(source)        finish()    })    go func() {        defer func() {            if r := recover(); r != nil {                cancel(fmt.Errorf("%v", r))            } else {                finish()            }        }()        reducer(collector, writer, cancel)        drain(collector)    }()    // 真正从生成器通道取数据执行Mapper    go executeMappers(func(item interface{}, w Writer) {        mapper(item, w, cancel)    }, source, collector, done.Done(), options.workers)    value, ok := <-output    if err := retErr.Load(); err != nil {        return nil, err    } else if ok {        return value, nil    } else {        return nil, ErrReduceNoOutput    }}

这段代码挺长的,咱们说下外围的点。咱们看到应用一个G 调用 executeMappers 办法。

go executeMappers(func(item interface{}, w Writer) {        mapper(item, w, cancel)    }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},    done <-chan lang.PlaceholderType, workers int) {    var wg sync.WaitGroup    defer func() {        // 期待所有工作全副执行结束        wg.Wait()        // 敞开通道        close(collector)    }()   //依据指定数量创立 worker池    pool := make(chan lang.PlaceholderType, workers)     writer := newGuardedWriter(collector, done)    for {        select {        case <-done:            return        case pool <- lang.Placeholder:            // 从buildSource() 返回的无缓冲通道取数据            item, ok := <-input             // 当通道敞开,完结            if !ok {                <-pool                return            }            wg.Add(1)            // better to safely run caller defined method            threading.GoSafe(func() {                defer func() {                    wg.Done()                    <-pool                }()                //真正运行闭包函数的中央               // func(item interface{}, w Writer) {               //    mapper(item, w, cancel)               //    }                mapper(item, writer)            })        }    }}

具体的逻辑已备注,代码很容易懂。

一旦 executeMappers 函数返回,敞开 collector 通道,那么执行 reducer 不再阻塞。

go func() {        defer func() {            if r := recover(); r != nil {                cancel(fmt.Errorf("%v", r))            } else {                finish()            }        }()        reducer(collector, writer, cancel)        //这里        drain(collector)    }()

这里的 reducer(collector, writer, cancel) 其实就是从 MapReduceVoid 传递的第三个闭包函数。

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {    _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {        reducer(input, cancel)        //这里        drain(input)        // We need to write a placeholder to let MapReduce to continue on reducer done,        // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.        writer.Write(lang.Placeholder)    }, opts...)    return err}

而后这个闭包函数又执行了 reducer(input, cancel),这里的 reducer 就是咱们一开始解释过的 VoidReducerFunc,从 Finish() 而来

等等,看到下面三个中央的 drain(input)了吗?

// drain drains the channel.func drain(channel <-chan interface{}) {    // drain the channel    for range channel {    }}

其实就是一个排空 channel 的操作,然而三个中央都对同一个 channel,也是让我费解。

还有更重要的一点。

go func() {        defer func() {            if r := recover(); r != nil {                cancel(fmt.Errorf("%v", r))            } else {                finish()            }        }()        reducer(collector, writer, cancel)        drain(collector)    }()

下面的代码,如果执行 reducerwriter 写入引发 panic,那么drain(collector) 会间接卡住。

不过作者曾经修复了这个问题,间接把 drain(collector) 放入到 defer

具体 issues[1]。

到这里,对于 Finish 的源码也就完结了。感兴趣的能够看看其余源码。

很喜爱 go-zero 里的一些工具,然而往往用的一些工具并不独立,
依赖于其余文件包,导致明明只想应用其中一个工具却须要装置整个包。
所以最终的后果就是扒源码,创立无依赖库工具集,遵循 MIT 即可。

附录
[1]
https://github.com/tal-tech/g...