关于golang:gozero-数据的流处理利器-fx

转自go-zero点击查看原文
go-zero微服务库地址https://github.com/tal-tech/go-zero

数据的流解决利器

流解决(Stream processing)是一种计算机编程范式,其容许给定一个数据序列(流解决数据源),一系列数据操作(函数)被利用到流中的每个元素。同时流解决工具能够显著进步程序员的开发效率,容许他们编写无效、洁净和简洁的代码。

流数据处理在咱们的日常工作中十分常见,举个例子,咱们在业务开发中往往会记录许多业务日志,这些日志个别是先发送到Kafka,而后再由Job生产Kafaka写到elasticsearch,在进行日志流解决的过程中,往往还会对日志做一些解决,比方过滤有效的日志,做一些计算以及重新组合日志等等,示意图如下:

流解决工具fx

gozero是一个性能齐备的微服务框架,框架中内置了很多十分实用的工具,其中就蕴含流数据处理工具fx,上面咱们通过一个简略的例子来意识下该工具:

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/tal-tech/go-zero/core/fx"
)

func main() {
    ch := make(chan int)

    go inputStream(ch)
    go outputStream(ch)

    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
    <-c
}

func inputStream(ch chan int) {
    count := 0
    for {
        ch <- count
        time.Sleep(time.Millisecond * 500)
        count++
    }
}

func outputStream(ch chan int) {
    fx.From(func(source chan<- interface{}) {
        for c := range ch {
            source <- c
        }
    }).Walk(func(item interface{}, pipe chan<- interface{}) {
        count := item.(int)
        pipe <- count
    }).Filter(func(item interface{}) bool {
        itemInt := item.(int)
        if itemInt%2 == 0 {
            return true
        }
        return false
    }).ForEach(func(item interface{}) {
        fmt.Println(item)
    })
}

inputStream函数模仿了流数据的产生,outputStream函数模仿了流数据的处理过程,其中From函数为流的输出,Walk函数并发的作用在每一个item上,Filter函数对item进行过滤为true保留为false不保留,ForEach函数遍历输入每一个item元素。

流数据处理两头操作

一个流的数据处理可能存在许多的两头操作,每个两头操作都能够作用在流上。就像流水线上的工人一样,每个工人操作完整机后都会返回解决实现的新整机,同理流解决两头操作实现后也会返回一个新的流。

fx的流解决两头操作:

操作函数 性能 输出
Distinct 去除反复的item KeyFunc,返回须要去重的key
Filter 过滤不满足条件的item FilterFunc,Option管制并发量
Group 对item进行分组 KeyFunc,以key进行分组
Head 取出前n个item,返回新stream int64保留数量
Map 对象转换 MapFunc,Option管制并发量
Merge 合并item到slice并生成新stream
Reverse 反转item
Sort 对item进行排序 LessFunc实现排序算法
Tail 与Head性能相似,取出后n个item组成新stream int64保留数量
Walk 作用在每个item上 WalkFunc,Option管制并发量

下图展现了每个步骤和每个步骤的后果:

用法与原理剖析

From

通过From函数构建流并返回Stream,流数据通过channel进行存储:

// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
})

// 源码
func From(generate GenerateFunc) Stream {
    source := make(chan interface{})

    go func() {
        defer close(source)
    // 结构流数据写入channel
        generate(source)
    }()

    return Range(source)
}
Filter

Filter函数提供过滤item的性能,FilterFunc定义过滤逻辑true保留item,false则不保留:

// 例子 保留偶数
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {
    source <- v
  }
}).Filter(func(item interface{}) bool {
  if item.(int)%2 == 0 {
    return true
  }
  return false
})

// 源码
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
    return p.Walk(func(item interface{}, pipe chan<- interface{}) {
    // 执行过滤函数true保留,false抛弃
        if fn(item) {
            pipe <- item
        }
    }, opts...)
}
Group

Group对流数据进行分组,需定义分组的key,数据分组后以slice存入channel:

// 例子 依照首字符"g"或者"p"分组,没有则分到另一组
    ss := []string{"golang", "google", "php", "python", "java", "c++"}
    fx.From(func(source chan<- interface{}) {
        for _, s := range ss {
            source <- s
        }
    }).Group(func(item interface{}) interface{} {
        if strings.HasPrefix(item.(string), "g") {
            return "g"
        } else if strings.HasPrefix(item.(string), "p") {
            return "p"
        }
        return ""
    }).ForEach(func(item interface{}) {
        fmt.Println(item)
    })
}

// 源码
func (p Stream) Group(fn KeyFunc) Stream {
  // 定义分组存储map
    groups := make(map[interface{}][]interface{})
    for item := range p.source {
    // 用户自定义分组key
        key := fn(item)
    // key雷同分到一组
        groups[key] = append(groups[key], item)
    }

    source := make(chan interface{})
    go func() {
        for _, group := range groups {
      // 雷同key的一组数据写入到channel
            source <- group
        }
        close(source)
    }()

    return Range(source)
}
Reverse

reverse能够对流中元素进行反转解决:

// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
  fmt.Println(item)
})

// 源码
func (p Stream) Reverse() Stream {
    var items []interface{}
  // 获取流中数据
    for item := range p.source {
        items = append(items, item)
    }
    // 反转算法
    for i := len(items)/2 - 1; i >= 0; i-- {
        opp := len(items) - 1 - i
        items[i], items[opp] = items[opp], items[i]
    }
    
  // 写入流
    return Just(items...)
}
Distinct

distinct对流中元素进行去重,去重在业务开发中比拟罕用,常常须要对用户id等做去重操作:

// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
  return item
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})
// 后果为 1,2,3,4,5,6

// 源码
func (p Stream) Distinct(fn KeyFunc) Stream {
    source := make(chan interface{})

    threading.GoSafe(func() {
        defer close(source)
        // 通过key进行去重,雷同key只保留一个
        keys := make(map[interface{}]lang.PlaceholderType)
        for item := range p.source {
            key := fn(item)
      // key存在则不保留
            if _, ok := keys[key]; !ok {
                source <- item
                keys[key] = lang.Placeholder
            }
        }
    })

    return Range(source)
}
Walk

Walk函数并发的作用在流中每一个item上,能够通过WithWorkers设置并发数,默认并发数为16,最小并发数为1,如设置unlimitedWorkers为true则并发数无限度,但并发写入流中的数据由defaultWorkers限度,WalkFunc中用户能够自定义后续写入流中的元素,能够不写入也能够写入多个元素:

// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
  newItem := strings.ToUpper(item.(string))
  pipe <- newItem
}).ForEach(func(item interface{}) {
  fmt.Println(item)
})

// 源码
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
    pipe := make(chan interface{}, option.workers)

    go func() {
        var wg sync.WaitGroup
        pool := make(chan lang.PlaceholderType, option.workers)

        for {
      // 管制并发数量
            pool <- lang.Placeholder
            item, ok := <-p.source
            if !ok {
                <-pool
                break
            }

            wg.Add(1)
            go func() {
                defer func() {
                    wg.Done()
                    <-pool
                }()
                // 作用在每个元素上
                fn(item, pipe)
            }()
        }

    // 期待解决实现
        wg.Wait()
        close(pipe)
    }()

    return Range(pipe)
}

并发解决

fx工具除了进行流数据处理以外还提供了函数并发性能,在微服务中实现某个性能往往须要依赖多个服务,并发的解决依赖能够无效的升高依赖耗时,晋升服务的性能。

fx.Parallel(func() {
  userRPC() // 依赖1
}, func() {
  accountRPC() // 依赖2
}, func() {
  orderRPC() // 依赖3
})

留神fx.Parallel进行依赖并行处理的时候不会有error返回,如需有error返回或者有一个依赖报错须要立马完结依赖申请请应用MapReduce工具进行解决。

总结

本篇文章介绍了流解决的基本概念和gozero中的流解决工具fx,在理论的生产中流解决场景利用也十分多,心愿本篇文章能给大家带来肯定的启发,更好的应答工作中的流解决场景。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理