关于golang:gozero-数据的流处理利器-fx

62次阅读

共计 4997 个字符,预计需要花费 13 分钟才能阅读完成。

转自 go-zero 点击查看原文
go-zero 微服务库地址 https://github.com/tal-tech/go-zero

数据的流解决利器

流解决 (Stream processing) 是一种计算机编程范式,其容许给定一个数据序列 (流解决数据源),一系列数据操作(函数) 被利用到流中的每个元素。同时流解决工具能够显著进步程序员的开发效率,容许他们编写无效、洁净和简洁的代码。

流数据处理在咱们的日常工作中十分常见,举个例子,咱们在业务开发中往往会记录许多业务日志,这些日志个别是先发送到 Kafka,而后再由 Job 生产 Kafaka 写到 elasticsearch,在进行日志流解决的过程中,往往还会对日志做一些解决,比方过滤有效的日志,做一些计算以及重新组合日志等等,示意图如下:

流解决工具 fx

gozero 是一个性能齐备的微服务框架,框架中内置了很多十分实用的工具,其中就蕴含流数据处理工具 fx,上面咱们通过一个简略的例子来意识下该工具:

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/tal-tech/go-zero/core/fx"
)

func main() {ch := make(chan int)

    go inputStream(ch)
    go outputStream(ch)

    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
    <-c
}

func inputStream(ch chan int) {
    count := 0
    for {
        ch <- count
        time.Sleep(time.Millisecond * 500)
        count++
    }
}

func outputStream(ch chan int) {fx.From(func(source chan<- interface{}) {
        for c := range ch {source <- c}
    }).Walk(func(item interface{}, pipe chan<- interface{}) {count := item.(int)
        pipe <- count
    }).Filter(func(item interface{}) bool {itemInt := item.(int)
        if itemInt%2 == 0 {return true}
        return false
    }).ForEach(func(item interface{}) {fmt.Println(item)
    })
}

inputStream 函数模仿了流数据的产生,outputStream 函数模仿了流数据的处理过程,其中 From 函数为流的输出,Walk 函数并发的作用在每一个 item 上,Filter 函数对 item 进行过滤为 true 保留为 false 不保留,ForEach 函数遍历输入每一个 item 元素。

流数据处理两头操作

一个流的数据处理可能存在许多的两头操作,每个两头操作都能够作用在流上。就像流水线上的工人一样,每个工人操作完整机后都会返回解决实现的新整机,同理流解决两头操作实现后也会返回一个新的流。

fx 的流解决两头操作:

操作函数 性能 输出
Distinct 去除反复的 item KeyFunc,返回须要去重的 key
Filter 过滤不满足条件的 item FilterFunc,Option 管制并发量
Group 对 item 进行分组 KeyFunc,以 key 进行分组
Head 取出前 n 个 item,返回新 stream int64 保留数量
Map 对象转换 MapFunc,Option 管制并发量
Merge 合并 item 到 slice 并生成新 stream
Reverse 反转 item
Sort 对 item 进行排序 LessFunc 实现排序算法
Tail 与 Head 性能相似,取出后 n 个 item 组成新 stream int64 保留数量
Walk 作用在每个 item 上 WalkFunc,Option 管制并发量

下图展现了每个步骤和每个步骤的后果:

用法与原理剖析

From

通过 From 函数构建流并返回 Stream,流数据通过 channel 进行存储:

// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {source <- v}
})

// 源码
func From(generate GenerateFunc) Stream {source := make(chan interface{})

    go func() {defer close(source)
    // 结构流数据写入 channel
        generate(source)
    }()

    return Range(source)
}
Filter

Filter 函数提供过滤 item 的性能,FilterFunc 定义过滤逻辑 true 保留 item,false 则不保留:

// 例子 保留偶数
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
  for _, v := range s {source <- v}
}).Filter(func(item interface{}) bool {if item.(int)%2 == 0 {return true}
  return false
})

// 源码
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {return p.Walk(func(item interface{}, pipe chan<- interface{}) {
    // 执行过滤函数 true 保留,false 抛弃
        if fn(item) {pipe <- item}
    }, opts...)
}
Group

Group 对流数据进行分组,需定义分组的 key,数据分组后以 slice 存入 channel:

// 例子 依照首字符 "g" 或者 "p" 分组,没有则分到另一组
    ss := []string{"golang", "google", "php", "python", "java", "c++"}
    fx.From(func(source chan<- interface{}) {
        for _, s := range ss {source <- s}
    }).Group(func(item interface{}) interface{} {if strings.HasPrefix(item.(string), "g") {return "g"} else if strings.HasPrefix(item.(string), "p") {return "p"}
        return ""
    }).ForEach(func(item interface{}) {fmt.Println(item)
    })
}

// 源码
func (p Stream) Group(fn KeyFunc) Stream {
  // 定义分组存储 map
    groups := make(map[interface{}][]interface{})
    for item := range p.source {
    // 用户自定义分组 key
        key := fn(item)
    // key 雷同分到一组
        groups[key] = append(groups[key], item)
    }

    source := make(chan interface{})
    go func() {
        for _, group := range groups {
      // 雷同 key 的一组数据写入到 channel
            source <- group
        }
        close(source)
    }()

    return Range(source)
}
Reverse

reverse 能够对流中元素进行反转解决:

// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {fmt.Println(item)
})

// 源码
func (p Stream) Reverse() Stream {var items []interface{}
  // 获取流中数据
    for item := range p.source {items = append(items, item)
    }
    // 反转算法
    for i := len(items)/2 - 1; i >= 0; i-- {opp := len(items) - 1 - i
        items[i], items[opp] = items[opp], items[i]
    }
    
  // 写入流
    return Just(items...)
}
Distinct

distinct 对流中元素进行去重,去重在业务开发中比拟罕用,常常须要对用户 id 等做去重操作:

// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {return item}).ForEach(func(item interface{}) {fmt.Println(item)
})
// 后果为 1,2,3,4,5,6

// 源码
func (p Stream) Distinct(fn KeyFunc) Stream {source := make(chan interface{})

    threading.GoSafe(func() {defer close(source)
        // 通过 key 进行去重,雷同 key 只保留一个
        keys := make(map[interface{}]lang.PlaceholderType)
        for item := range p.source {key := fn(item)
      // key 存在则不保留
            if _, ok := keys[key]; !ok {
                source <- item
                keys[key] = lang.Placeholder
            }
        }
    })

    return Range(source)
}
Walk

Walk 函数并发的作用在流中每一个 item 上,能够通过 WithWorkers 设置并发数,默认并发数为 16,最小并发数为 1,如设置 unlimitedWorkers 为 true 则并发数无限度,但并发写入流中的数据由 defaultWorkers 限度,WalkFunc 中用户能够自定义后续写入流中的元素,能够不写入也能够写入多个元素:

// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {newItem := strings.ToUpper(item.(string))
  pipe <- newItem
}).ForEach(func(item interface{}) {fmt.Println(item)
})

// 源码
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {pipe := make(chan interface{}, option.workers)

    go func() {
        var wg sync.WaitGroup
        pool := make(chan lang.PlaceholderType, option.workers)

        for {
      // 管制并发数量
            pool <- lang.Placeholder
            item, ok := <-p.source
            if !ok {
                <-pool
                break
            }

            wg.Add(1)
            go func() {defer func() {wg.Done()
                    <-pool
                }()
                // 作用在每个元素上
                fn(item, pipe)
            }()}

    // 期待解决实现
        wg.Wait()
        close(pipe)
    }()

    return Range(pipe)
}

并发解决

fx 工具除了进行流数据处理以外还提供了函数并发性能,在微服务中实现某个性能往往须要依赖多个服务,并发的解决依赖能够无效的升高依赖耗时,晋升服务的性能。

fx.Parallel(func() {userRPC() // 依赖 1
}, func() {accountRPC() // 依赖 2
}, func() {orderRPC() // 依赖 3
})

留神 fx.Parallel 进行依赖并行处理的时候不会有 error 返回,如需有 error 返回或者有一个依赖报错须要立马完结依赖申请请应用 MapReduce 工具进行解决。

总结

本篇文章介绍了流解决的基本概念和 gozero 中的流解决工具 fx,在理论的生产中流解决场景利用也十分多,心愿本篇文章能给大家带来肯定的启发,更好的应答工作中的流解决场景。

正文完
 0