转自go-zero点击查看原文
go-zero微服务库地址https://github.com/tal-tech/go-zero

数据的流解决利器

流解决(Stream processing)是一种计算机编程范式,其容许给定一个数据序列(流解决数据源),一系列数据操作(函数)被利用到流中的每个元素。同时流解决工具能够显著进步程序员的开发效率,容许他们编写无效、洁净和简洁的代码。

流数据处理在咱们的日常工作中十分常见,举个例子,咱们在业务开发中往往会记录许多业务日志,这些日志个别是先发送到Kafka,而后再由Job生产Kafaka写到elasticsearch,在进行日志流解决的过程中,往往还会对日志做一些解决,比方过滤有效的日志,做一些计算以及重新组合日志等等,示意图如下:

流解决工具fx

gozero是一个性能齐备的微服务框架,框架中内置了很多十分实用的工具,其中就蕴含流数据处理工具fx,上面咱们通过一个简略的例子来意识下该工具:

package mainimport (    "fmt"    "os"    "os/signal"    "syscall"    "time"    "github.com/tal-tech/go-zero/core/fx")func main() {    ch := make(chan int)    go inputStream(ch)    go outputStream(ch)    c := make(chan os.Signal, 1)    signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)    <-c}func inputStream(ch chan int) {    count := 0    for {        ch <- count        time.Sleep(time.Millisecond * 500)        count++    }}func outputStream(ch chan int) {    fx.From(func(source chan<- interface{}) {        for c := range ch {            source <- c        }    }).Walk(func(item interface{}, pipe chan<- interface{}) {        count := item.(int)        pipe <- count    }).Filter(func(item interface{}) bool {        itemInt := item.(int)        if itemInt%2 == 0 {            return true        }        return false    }).ForEach(func(item interface{}) {        fmt.Println(item)    })}

inputStream函数模仿了流数据的产生,outputStream函数模仿了流数据的处理过程,其中From函数为流的输出,Walk函数并发的作用在每一个item上,Filter函数对item进行过滤为true保留为false不保留,ForEach函数遍历输入每一个item元素。

流数据处理两头操作

一个流的数据处理可能存在许多的两头操作,每个两头操作都能够作用在流上。就像流水线上的工人一样,每个工人操作完整机后都会返回解决实现的新整机,同理流解决两头操作实现后也会返回一个新的流。

fx的流解决两头操作:

操作函数性能输出
Distinct去除反复的itemKeyFunc,返回须要去重的key
Filter过滤不满足条件的itemFilterFunc,Option管制并发量
Group对item进行分组KeyFunc,以key进行分组
Head取出前n个item,返回新streamint64保留数量
Map对象转换MapFunc,Option管制并发量
Merge合并item到slice并生成新stream
Reverse反转item
Sort对item进行排序LessFunc实现排序算法
Tail与Head性能相似,取出后n个item组成新streamint64保留数量
Walk作用在每个item上WalkFunc,Option管制并发量

下图展现了每个步骤和每个步骤的后果:

用法与原理剖析

From

通过From函数构建流并返回Stream,流数据通过channel进行存储:

// 例子s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}fx.From(func(source chan<- interface{}) {  for _, v := range s {    source <- v  }})// 源码func From(generate GenerateFunc) Stream {    source := make(chan interface{})    go func() {        defer close(source)    // 结构流数据写入channel        generate(source)    }()    return Range(source)}
Filter

Filter函数提供过滤item的性能,FilterFunc定义过滤逻辑true保留item,false则不保留:

// 例子 保留偶数s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}fx.From(func(source chan<- interface{}) {  for _, v := range s {    source <- v  }}).Filter(func(item interface{}) bool {  if item.(int)%2 == 0 {    return true  }  return false})// 源码func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {    return p.Walk(func(item interface{}, pipe chan<- interface{}) {    // 执行过滤函数true保留,false抛弃        if fn(item) {            pipe <- item        }    }, opts...)}
Group

Group对流数据进行分组,需定义分组的key,数据分组后以slice存入channel:

// 例子 依照首字符"g"或者"p"分组,没有则分到另一组    ss := []string{"golang", "google", "php", "python", "java", "c++"}    fx.From(func(source chan<- interface{}) {        for _, s := range ss {            source <- s        }    }).Group(func(item interface{}) interface{} {        if strings.HasPrefix(item.(string), "g") {            return "g"        } else if strings.HasPrefix(item.(string), "p") {            return "p"        }        return ""    }).ForEach(func(item interface{}) {        fmt.Println(item)    })}// 源码func (p Stream) Group(fn KeyFunc) Stream {  // 定义分组存储map    groups := make(map[interface{}][]interface{})    for item := range p.source {    // 用户自定义分组key        key := fn(item)    // key雷同分到一组        groups[key] = append(groups[key], item)    }    source := make(chan interface{})    go func() {        for _, group := range groups {      // 雷同key的一组数据写入到channel            source <- group        }        close(source)    }()    return Range(source)}
Reverse

reverse能够对流中元素进行反转解决:

// 例子fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {  fmt.Println(item)})// 源码func (p Stream) Reverse() Stream {    var items []interface{}  // 获取流中数据    for item := range p.source {        items = append(items, item)    }    // 反转算法    for i := len(items)/2 - 1; i >= 0; i-- {        opp := len(items) - 1 - i        items[i], items[opp] = items[opp], items[i]    }      // 写入流    return Just(items...)}
Distinct

distinct对流中元素进行去重,去重在业务开发中比拟罕用,常常须要对用户id等做去重操作:

// 例子fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {  return item}).ForEach(func(item interface{}) {  fmt.Println(item)})// 后果为 1,2,3,4,5,6// 源码func (p Stream) Distinct(fn KeyFunc) Stream {    source := make(chan interface{})    threading.GoSafe(func() {        defer close(source)        // 通过key进行去重,雷同key只保留一个        keys := make(map[interface{}]lang.PlaceholderType)        for item := range p.source {            key := fn(item)      // key存在则不保留            if _, ok := keys[key]; !ok {                source <- item                keys[key] = lang.Placeholder            }        }    })    return Range(source)}
Walk

Walk函数并发的作用在流中每一个item上,能够通过WithWorkers设置并发数,默认并发数为16,最小并发数为1,如设置unlimitedWorkers为true则并发数无限度,但并发写入流中的数据由defaultWorkers限度,WalkFunc中用户能够自定义后续写入流中的元素,能够不写入也能够写入多个元素:

// 例子fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {  newItem := strings.ToUpper(item.(string))  pipe <- newItem}).ForEach(func(item interface{}) {  fmt.Println(item)})// 源码func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {    pipe := make(chan interface{}, option.workers)    go func() {        var wg sync.WaitGroup        pool := make(chan lang.PlaceholderType, option.workers)        for {      // 管制并发数量            pool <- lang.Placeholder            item, ok := <-p.source            if !ok {                <-pool                break            }            wg.Add(1)            go func() {                defer func() {                    wg.Done()                    <-pool                }()                // 作用在每个元素上                fn(item, pipe)            }()        }    // 期待解决实现        wg.Wait()        close(pipe)    }()    return Range(pipe)}

并发解决

fx工具除了进行流数据处理以外还提供了函数并发性能,在微服务中实现某个性能往往须要依赖多个服务,并发的解决依赖能够无效的升高依赖耗时,晋升服务的性能。

fx.Parallel(func() {  userRPC() // 依赖1}, func() {  accountRPC() // 依赖2}, func() {  orderRPC() // 依赖3})

留神fx.Parallel进行依赖并行处理的时候不会有error返回,如需有error返回或者有一个依赖报错须要立马完结依赖申请请应用MapReduce工具进行解决。

总结

本篇文章介绍了流解决的基本概念和gozero中的流解决工具fx,在理论的生产中流解决场景利用也十分多,心愿本篇文章能给大家带来肯定的启发,更好的应答工作中的流解决场景。