业务场景
在做工作开发的时候,你们肯定会碰到以下场景:
场景 1:调用第三方接口的时候,一个需要你须要调用不同的接口,做数据组装。
场景 2: 一个利用首页可能依靠于很多服务。那就波及到在加载页面时须要同时申请多个服务的接口。这一步往往是由后端对立调用组装数据再返回给前端,也就是所谓的 BFF(Backend For Frontend) 层。
针对以上两种场景,假如在没有强依赖关系下,抉择串行调用,那么总耗时即:
time=s1+s2+....sn
依照当代秒入百万的有为青年,这么长时间早就把你祖宗十八代问候了一遍。
为了平凡的 KPI,咱们往往会抉择并发地调用这些依赖接口。那么总耗时就是:
time=max(s1,s2,s3.....,sn)
当然开始堆业务的时候能够先串行化,等到下面的人焦急的时候,亮出绝招。
这样,年底 PPT 就能够加上浓厚的一笔流水账: 为业务某个接口进步百分之 XXX 性能,间接产生 XXX 价值。
当然这所有的前提是,做老板不懂技术,做技术”懂”你。
言归正传, 如果批改成并发调用,你可能会这么写,
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var userInfo *User
var productList []Product
go func() {defer wg.Done()
userInfo, _ = getUser()}()
go func() {defer wg.Done()
productList, _ = getProductList()}()
wg.Wait()
fmt.Printf("用户信息:%+v\n", userInfo)
fmt.Printf("商品信息:%+v\n", productList)
}
/******** 用户服务 **********/
type User struct {
Name string
Age uint8
}
func getUser() (*User, error) {time.Sleep(500 * time.Millisecond)
var u User
u.Name = "wuqinqiang"
u.Age = 18
return &u, nil
}
/******** 商品服务 **********/
type Product struct {
Title string
Price uint32
}
func getProductList() ([]Product, error) {time.Sleep(400 * time.Millisecond)
var list []Product
list = append(list, Product{
Title: "SHib",
Price: 10,
})
return list, nil
}
先不论其余问题。从实现上来说,须要多少服务,你会开多少个 G
,利用 sync.WaitGroup
的个性,
实现并发编排工作的成果。
如同,问题不大。
然而随着代号 996
业务场景的减少,你会发现,好多模块都有类似的性能,只是对应的业务场景不同而已。
那么咱们能不能抽像出一套针对此业务场景的工具,而把具体业务实现交给业务方。
安顿。
应用
本着不反复造轮子的准则,去搜了下开源我的项目,最终看上了 go-zero
外面的一个工具 mapreduce
。
从文件名咱们能看进去是什么了,能够自行 Google
这个名词。
应用很简略。咱们通过它革新一下下面的代码:
package main
import (
"fmt"
"github.com/tal-tech/go-zero/core/mr"
"time"
)
func main() {
var userInfo *User
var productList []Product
_ = mr.Finish(func() (err error) {userInfo, err = getUser()
return err
}, func() (err error) {productList, err = getProductList()
return err
})
fmt.Printf("用户信息:%+v\n", userInfo)
fmt.Printf("商品信息:%+v\n", productList)
}
用户信息:&{Name:wuqinqiang Age:18}
商品信息:[{Title:SHib Price:10}]
是不是难受多了。
然而这里还须要留神一点,假如你调用的其中一个服务谬误,并且你 return err
对应的谬误,那么其余调用的服务会被勾销。
比方咱们批改 getProductList 间接响应谬误。
func getProductList() ([]Product, error) {return nil, errors.New("test error")
}
// 打印
用户信息:<nil>
商品信息:[]
那么最终打印的时候连用户信息都会为空,因为呈现一个服务谬误,用户服务申请被勾销了。
个别状况下,在申请服务谬误的时候咱们会有保底操作,一个服务谬误不能影响其余申请的后果。
所以在应用的时候具体解决取决于业务场景。
源码
既然用了,那么就追下源码吧。
func Finish(fns ...func() error) error {if len(fns) == 0 {return nil}
return MapReduceVoid(func(source chan<- interface{}) {
for _, fn := range fns {source <- fn}
}, func(item interface{}, writer Writer, cancel func(error)) {fn := item.(func() error)
if err := fn(); err != nil {cancel(err)
}
}, func(pipe <-chan interface{}, cancel func(error)) {drain(pipe)
}, WithWorkers(len(fns)))
}
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {reducer(input, cancel)
drain(input)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
return err
}
对于 MapReduceVoid
函数,次要查看三个闭包参数。
- 第一个
GenerateFunc
用于生产数据。 MapperFunc
读取生产出的数据,进行解决。VoidReducerFunc
这里示意不对mapper
后的数据做聚合返回。所以这个闭包在此操作简直 0 作用。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {source := buildSource(generate)
return MapReduceWithSource(source, mapper, reducer, opts...)
}
func buildSource(generate GenerateFunc) chan interface{} {source := make(chan interface{})// 创立无缓冲通道
threading.GoSafe(func() {defer close(source)
generate(source) // 开始生产数据
})
return source // 返回无缓冲通道
}
buildSource
函数中,返回一个无缓冲的通道。并开启一个 G
运行 generate(source)
,往无缓冲通道塞数据。这个 generate(source)
不就是一开始 Finish
传递的第一个闭包参数。
return MapReduceVoid(func(source chan<- interface{}) {
// 就这个
for _, fn := range fns {source <- fn}
})
而后查看 MapReduceWithSource
函数,
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {options := buildOptions(opts...)
// 工作执行完结告诉信号
output := make(chan interface{})
// 将 mapper 解决完的数据写入 collector
collector := make(chan interface{}, options.workers)
// 勾销操作信号
done := syncx.NewDoneChan()
writer := newGuardedWriter(output, done.Done())
var closeOnce sync.Once
var retErr errorx.AtomicError
finish := func() {closeOnce.Do(func() {done.Close()
close(output)
})
}
cancel := once(func(err error) {
if err != nil {retErr.Set(err)
} else {retErr.Set(ErrCancelWithNil)
}
drain(source)
finish()})
go func() {defer func() {if r := recover(); r != nil {cancel(fmt.Errorf("%v", r))
} else {finish()
}
}()
reducer(collector, writer, cancel)
drain(collector)
}()
// 真正从生成器通道取数据执行 Mapper
go executeMappers(func(item interface{}, w Writer) {mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
value, ok := <-output
if err := retErr.Load(); err != nil {return nil, err} else if ok {return value, nil} else {return nil, ErrReduceNoOutput}
}
这段代码挺长的,咱们说下外围的点。咱们看到应用一个 G
调用 executeMappers
办法。
go executeMappers(func(item interface{}, w Writer) {mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
done <-chan lang.PlaceholderType, workers int) {
var wg sync.WaitGroup
defer func() {
// 期待所有工作全副执行结束
wg.Wait()
// 敞开通道
close(collector)
}()
// 依据指定数量创立 worker 池
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done)
for {
select {
case <-done:
return
case pool <- lang.Placeholder:
// 从 buildSource() 返回的无缓冲通道取数据
item, ok := <-input
// 当通道敞开,完结
if !ok {
<-pool
return
}
wg.Add(1)
// better to safely run caller defined method
threading.GoSafe(func() {defer func() {wg.Done()
<-pool
}()
// 真正运行闭包函数的中央
// func(item interface{}, w Writer) {// mapper(item, w, cancel)
// }
mapper(item, writer)
})
}
}
}
具体的逻辑已备注,代码很容易懂。
一旦 executeMappers
函数返回,敞开 collector
通道,那么执行 reducer
不再阻塞。
go func() {defer func() {if r := recover(); r != nil {cancel(fmt.Errorf("%v", r))
} else {finish()
}
}()
reducer(collector, writer, cancel)
// 这里
drain(collector)
}()
这里的 reducer(collector, writer, cancel)
其实就是从 MapReduceVoid
传递的第三个闭包函数。
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {reducer(input, cancel)
// 这里
drain(input)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
return err
}
而后这个闭包函数又执行了 reducer(input, cancel)
,这里的 reducer
就是咱们一开始解释过的 VoidReducerFunc
,从 Finish() 而来
。
等等, 看到下面三个中央的 drain(input)
了吗?
// drain drains the channel.
func drain(channel <-chan interface{}) {
// drain the channel
for range channel {}}
其实就是一个排空 channel
的操作,然而三个中央都对同一个 channel
, 也是让我费解。
还有更重要的一点。
go func() {defer func() {if r := recover(); r != nil {cancel(fmt.Errorf("%v", r))
} else {finish()
}
}()
reducer(collector, writer, cancel)
drain(collector)
}()
下面的代码,如果执行 reducer
,writer
写入引发 panic
, 那么 drain(collector)
会间接卡住。
不过作者曾经修复了这个问题,间接把 drain(collector)
放入到 defer
。
具体 issues[1]。
到这里,对于 Finish
的源码也就完结了。感兴趣的能够看看其余源码。
很喜爱 go-zero
里的一些工具,然而往往用的一些工具并不独立,
依赖于其余文件包,导致明明只想应用其中一个工具却须要装置整个包。
所以最终的后果就是扒源码,创立无依赖库工具集,遵循 MIT
即可。
附录
[1]
https://github.com/tal-tech/g…