共计 13109 个字符,预计需要花费 33 分钟才能阅读完成。
什么是流解决
如果有 java 应用教训的同学肯定会对 java8 的 Stream 拍案叫绝,极大的进步了们对于汇合类型数据的解决能力。
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
Stream 能让咱们反对链式调用和函数编程的格调来实现数据的解决,看起来数据像是在流水线一样一直的实时流转加工,最终被汇总。Stream 的实现思维就是将数据处理流程形象成了一个数据流,每次加工后返回一个新的流供应用。
Stream 性能定义
入手写代码之前,先想分明,把需要理分明是最重要的一步,咱们尝试代入作者的视角来思考整个组件的实现流程。首先把底层实现的逻辑放一下 , 先尝试从零开始进行性能定义 stream 性能。
Stream 的工作流程其实也属于生产消费者模型,整个流程跟工厂中的生产流程十分类似,尝试先定义一下 Stream 的生命周期:
- 创立阶段 / 数据获取(原料)
- 加工阶段 / 两头解决(流水线加工)
- 汇总阶段 / 终结操作(最终产品)
上面围绕 stream 的三个生命周期开始定义 API:
创立阶段
为了创立出数据流 stream 这一形象对象,能够了解为结构器。
咱们反对三种形式结构 stream,别离是:切片转换,channel 转换,函数式转换。
留神这个阶段的办法都是一般的公开办法,并不绑定 Stream 对象。
// 通过可变参数模式创立 stream
func Just(items ...interface{}) Stream
// 通过 channel 创立 stream
func Range(source <-chan interface{}) Stream
// 通过函数创立 stream
func From(generate GenerateFunc) Stream
// 拼接 stream
func Concat(s Stream, others ...Stream) Stream
加工阶段
加工阶段须要进行的操作往往对应了咱们的业务逻辑,比方:转换,过滤,去重,排序等等。
这个阶段的 API 属于 method 须要绑定到 Stream 对象上。
联合罕用的业务场景进行如下定义:
// 去除反复 item
Distinct(keyFunc KeyFunc) Stream
// 按条件过滤 item
Filter(filterFunc FilterFunc, opts ...Option) Stream
// 分组
Group(fn KeyFunc) Stream
// 返回前 n 个元素
Head(n int64) Stream
// 返回后 n 个元素
Tail(n int64) Stream
// 转换对象
Map(fn MapFunc, opts ...Option) Stream
// 合并 item 到 slice 生成新的 stream
Merge() Stream
// 反转
Reverse() Stream
// 排序
Sort(fn LessFunc) Stream
// 作用在每个 item 上
Walk(fn WalkFunc, opts ...Option) Stream
// 聚合其余 Stream
Concat(streams ...Stream) Stream
加工阶段的解决逻辑都会返回一个新的 Stream 对象,这里有个根本的实现范式
汇总阶段
汇总阶段其实就是咱们想要的处理结果,比方:是否匹配,统计数量,遍历等等。
// 查看是否全副匹配
AllMatch(fn PredicateFunc) bool
// 查看是否存在至多一项匹配
AnyMatch(fn PredicateFunc) bool
// 查看全副不匹配
NoneMatch(fn PredicateFunc) bool
// 统计数量
Count() int
// 清空 stream
Done()
// 对所有元素执行操作
ForAll(fn ForAllFunc)
// 对每个元素执行操作
ForEach(fn ForEachFunc)
梳理完组件的需要边界后,咱们对于行将要实现的 Stream 有了更清晰的意识。在我的认知外面真正的架构师对于需要的把握以及后续演变能达到及其精准的境地,做到这一点离不开对需要的深刻思考以及洞穿需要背地的实质。通过代入作者的视角来模仿复盘整个我的项目的构建流程,学习作者的思维方法论这正是咱们学习开源我的项目最大的价值所在。
好了,咱们尝试定义出残缺的 Stream 接口全貌以及函数。
接口的作用不仅仅是模版作用,还在于利用其形象能力搭建我的项目整体的框架而不至于一开始就陷入细节,能疾速的将咱们的思考过程通过接口简洁的表达出来,学会养成自顶向下的思维办法从宏观的角度来察看整个零碎,一开始就陷入细节则很容易拔剑四顾心茫然。。。
rxOptions struct {
unlimitedWorkers bool
workers int
}
Option func(opts *rxOptions)
// key 生成器
//item - stream 中的元素
KeyFunc func(item interface{}) interface{}
// 过滤函数
FilterFunc func(item interface{}) bool
// 对象转换函数
MapFunc func(intem interface{}) interface{}
// 对象比拟
LessFunc func(a, b interface{}) bool
// 遍历函数
WalkFunc func(item interface{}, pip chan<- interface{})
// 匹配函数
PredicateFunc func(item interface{}) bool
// 对所有元素执行操作
ForAllFunc func(pip <-chan interface{})
// 对每个 item 执行操作
ForEachFunc func(item interface{})
// 对每个元素并发执行操作
ParallelFunc func(item interface{})
// 对所有元素执行聚合操作
ReduceFunc func(pip <-chan interface{}) (interface{}, error)
// item 生成函数
GenerateFunc func(source <-chan interface{})
Stream interface {
// 去除反复 item
Distinct(keyFunc KeyFunc) Stream
// 按条件过滤 item
Filter(filterFunc FilterFunc, opts ...Option) Stream
// 分组
Group(fn KeyFunc) Stream
// 返回前 n 个元素
Head(n int64) Stream
// 返回后 n 个元素
Tail(n int64) Stream
// 获取第一个元素
First() interface{}
// 获取最初一个元素
Last() interface{}
// 转换对象
Map(fn MapFunc, opts ...Option) Stream
// 合并 item 到 slice 生成新的 stream
Merge() Stream
// 反转
Reverse() Stream
// 排序
Sort(fn LessFunc) Stream
// 作用在每个 item 上
Walk(fn WalkFunc, opts ...Option) Stream
// 聚合其余 Stream
Concat(streams ...Stream) Stream
// 查看是否全副匹配
AllMatch(fn PredicateFunc) bool
// 查看是否存在至多一项匹配
AnyMatch(fn PredicateFunc) bool
// 查看全副不匹配
NoneMatch(fn PredicateFunc) bool
// 统计数量
Count() int
// 清空 stream
Done()
// 对所有元素执行操作
ForAll(fn ForAllFunc)
// 对每个元素执行操作
ForEach(fn ForEachFunc)
}
channel() 办法用于获取 Stream 管道属性,因为在具体实现时咱们面向的是接口对象所以裸露一个公有办法 read 进去。
// 获取外部的数据容器 channel, 外部办法
channel() chan interface{}
实现思路
性能定义梳理分明了,接下来思考几个工程实现的问题。
如何实现链式调用
链式调用,创建对象用到的 builder 模式能够达到链式调用成果。实际上 Stream 实现相似链式的成果原理也是一样的,每次调用完后都创立一个新的 Stream 返回给用户。
// 去除反复 item
Distinct(keyFunc KeyFunc) Stream
// 按条件过滤 item
Filter(filterFunc FilterFunc, opts ...Option) Stream
如何实现流水线的解决成果
所谓的流水线能够了解为数据在 Stream 中的存储容器,在 go 中咱们能够应用 channel 作为数据的管道,达到 Stream 链式调用执行多个操作时 异步非阻塞 成果。
如何反对并行处理
数据加工实质上是在解决 channel 中的数据,那么要实现并行处理无非是并行生产 channel 而已,利用 goroutine 协程、WaitGroup 机制能够十分不便的实现并行处理。
go-zero 实现
core/fx/stream.go
go-zero 中对于 Stream 的实现并没有定义接口,不过没关系底层实现时逻辑是一样的。
为了实现 Stream 接口咱们定义一个外部的实现类,其中 source 为 channel 类型,模仿流水线性能。
Stream struct {source <-chan interface{}
}
创立 API
channel 创立 Range
通过 channel 创立 stream
func Range(source <-chan interface{}) Stream {
return Stream{source: source,}
}
可变参数模式创立 Just
通过可变参数模式创立 stream,channel 写完后及时 close 是个好习惯。
func Just(items ...interface{}) Stream {source := make(chan interface{}, len(items))
for _, item := range items {source <- item}
close(source)
return Range(source)
}
函数创立 From
通过函数创立 Stream
func From(generate GenerateFunc) Stream {source := make(chan interface{})
threading.GoSafe(func() {defer close(source)
generate(source)
})
return Range(source)
}
因为波及内部传入的函数参数调用,执行过程并不可用因而须要捕获运行时异样避免 panic 谬误传导到下层导致利用解体。
func Recover(cleanups ...func()) {
for _, cleanup := range cleanups {cleanup()
}
if r := recover(); r != nil {logx.ErrorStack(r)
}
}
func RunSafe(fn func()) {defer rescue.Recover()
fn()}
func GoSafe(fn func()) {go RunSafe(fn)
}
拼接 Concat
拼接其余 Stream 创立一个新的 Stream,调用外部 Concat method 办法,后文将会剖析 Concat 的源码实现。
func Concat(s Stream, others ...Stream) Stream {return s.Concat(others...)
}
加工 API
去重 Distinct
因为传入的是函数参数 KeyFunc func(item interface{}) interface{}
意味着也同时反对依照业务场景自定义去重,实质上是利用 KeyFunc 返回的后果基于 map 实现去重。
函数参数十分弱小,能极大的晋升灵活性。
func (s Stream) Distinct(keyFunc KeyFunc) Stream {source := make(chan interface{})
threading.GoSafe(func() {
// channel 记得敞开是个好习惯
defer close(source)
keys := make(map[interface{}]lang.PlaceholderType)
for item := range s.source {
// 自定义去重逻辑
key := keyFunc(item)
// 如果 key 不存在, 则将数据写入新的 channel
if _, ok := keys[key]; !ok {
source <- item
keys[key] = lang.Placeholder
}
}
})
return Range(source)
}
应用案例:
// 1 2 3 4 5
Just(1, 2, 3, 3, 4, 5, 5).Distinct(func(item interface{}) interface{} {return item}).ForEach(func(item interface{}) {t.Log(item)
})
// 1 2 3 4
Just(1, 2, 3, 3, 4, 5, 5).Distinct(func(item interface{}) interface{} {uid := item.(int)
// 对大于 4 的 item 进行非凡去重逻辑, 最终只保留一个 >3 的 item
if uid > 3 {return 4}
return item
}).ForEach(func(item interface{}) {t.Log(item)
})
过滤 Filter
通过将过滤逻辑形象成 FilterFunc,而后别离作用在 item 上依据 FilterFunc 返回的布尔值决定是否写回新的 channel 中实现过滤性能,理论的过滤逻辑委托给了 Walk method。
Option 参数蕴含两个选项:
- unlimitedWorkers 不限度协程数量
- workers 限度协程数量
FilterFunc func(item interface{}) bool
func (s Stream) Filter(filterFunc FilterFunc, opts ...Option) Stream {return s.Walk(func(item interface{}, pip chan<- interface{}) {if filterFunc(item) {pip <- item}
}, opts...)
}
应用示例:
func TestInternalStream_Filter(t *testing.T) {
// 保留偶数 2,4
channel := Just(1, 2, 3, 4, 5).Filter(func(item interface{}) bool {return item.(int)%2 == 0
}).channel()
for item := range channel {t.Log(item)
}
}
遍历执行 Walk
walk 英文意思是步行,这里的意思是对每个 item 都执行一次 WalkFunc 操作并将后果写入到新的 Stream 中。
这里留神一下因为外部采纳了协程机制异步执行读取和写入数据所以新的 Stream 中 channel 外面的数据程序是随机的。
// item-stream 中的 item 元素
// pipe-item 符合条件则写入 pipe
WalkFunc func(item interface{}, pipe chan<- interface{})
func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream {option := buildOptions(opts...)
if option.unlimitedWorkers {return s.walkUnLimited(fn, option)
}
return s.walkLimited(fn, option)
}
func (s Stream) walkUnLimited(fn WalkFunc, option *rxOptions) Stream {
// 创立带缓冲区的 channel
// 默认为 16,channel 中元素超过 16 将会被阻塞
pipe := make(chan interface{}, defaultWorkers)
go func() {
var wg sync.WaitGroup
for item := range s.source {
// 须要读取 s.source 的所有元素
// 这里也阐明了为什么 channel 最初写完记得结束
// 如果不敞开可能导致协程始终阻塞导致透露
// 重要, 不赋值给 val 是个典型的并发陷阱,前面在另一个 goroutine 里应用了
val := item
wg.Add(1)
// 平安模式下执行函数
threading.GoSafe(func() {defer wg.Done()
fn(item, pipe)
})
}
wg.Wait()
close(pipe)
}()
// 返回新的 Stream
return Range(pipe)
}
func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {pipe := make(chan interface{}, option.workers)
go func() {
var wg sync.WaitGroup
// 管制协程数量
pool := make(chan lang.PlaceholderType, option.workers)
for item := range s.source {
// 重要, 不赋值给 val 是个典型的并发陷阱,前面在另一个 goroutine 里应用了
val := item
// 超过协程限度时将会被阻塞
pool <- lang.Placeholder
// 这里也阐明了为什么 channel 最初写完记得结束
// 如果不敞开可能导致协程始终阻塞导致透露
wg.Add(1)
// 平安模式下执行函数
threading.GoSafe(func() {defer func() {wg.Done()
// 执行实现后读取一次 pool 开释一个协程地位
<-pool
}()
fn(item, pipe)
})
}
wg.Wait()
close(pipe)
}()
return Range(pipe)
}
应用案例:
返回的程序是随机的。
func Test_Stream_Walk(t *testing.T) {
// 返回 300,100,200
Just(1, 2, 3).Walk(func(item interface{}, pip chan<- interface{}) {pip <- item.(int) * 100
}, WithWorkers(3)).ForEach(func(item interface{}) {t.Log(item)
})
}
分组 Group
通过对 item 匹配放入 map 中。
KeyFunc func(item interface{}) interface{}
func (s Stream) Group(fn KeyFunc) Stream {groups := make(map[interface{}][]interface{})
for item := range s.source {key := fn(item)
groups[key] = append(groups[key], item)
}
source := make(chan interface{})
go func() {
for _, group := range groups {source <- group}
close(source)
}()
return Range(source)
}
获取前 n 个元素 Head
n 大于理论数据集长度的话将会返回全副元素
func (s Stream) Head(n int64) Stream {
if n < 1 {panic("n must be greather than 1")
}
source := make(chan interface{})
go func() {
for item := range s.source {
n--
// n 值可能大于 s.source 长度, 须要判断是否 >=0
if n >= 0 {source <- item}
// let successive method go ASAP even we have more items to skip
// why we don't just break the loop, because if break,
// this former goroutine will block forever, which will cause goroutine leak.
// n== 0 阐明 source 曾经写满能够进行敞开了
// 既然 source 曾经满足条件了为什么不间接进行 break 跳出循环呢?
// 作者提到了避免协程透露
// 因为每次操作最终都会产生一个新的 Stream, 旧的 Stream 永远也不会被调用了
if n == 0 {close(source)
break
}
}
// 下面的循环跳进去了阐明 n 大于 s.source 理论长度
// 仍旧须要显示敞开新的 source
if n > 0 {close(source)
}
}()
return Range(source)
}
应用示例:
// 返回 1,2
func TestInternalStream_Head(t *testing.T) {channel := Just(1, 2, 3, 4, 5).Head(2).channel()
for item := range channel {t.Log(item)
}
}
获取后 n 个元素 Tail
这里很有意思,为了确保拿到最初 n 个元素应用环形切片 Ring 这个数据结构,先理解一下 Ring 的实现。
// 环形切片
type Ring struct {elements []interface{}
index int
lock sync.Mutex
}
func NewRing(n int) *Ring {
if n < 1 {panic("n should be greather than 0")
}
return &Ring{elements: make([]interface{}, n),
}
}
// 增加元素
func (r *Ring) Add(v interface{}) {r.lock.Lock()
defer r.lock.Unlock()
// 将元素写入切片指定地位
// 这里的取余实现了循环写成果
r.elements[r.index%len(r.elements)] = v
// 更新下次写入地位
r.index++
}
// 获取全副元素
// 读取程序放弃与写入程序统一
func (r *Ring) Take() []interface{} {r.lock.Lock()
defer r.lock.Unlock()
var size int
var start int
// 当呈现循环写的状况时
// 开始读取地位须要通过去余实现, 因为咱们心愿读取进去的程序与写入程序统一
if r.index > len(r.elements) {size = len(r.elements)
// 因为呈现循环写状况, 以后写入地位 index 开始为最旧的数据
start = r.index % len(r.elements)
} else {size = r.index}
elements := make([]interface{}, size)
for i := 0; i < size; i++ {
// 取余实现环形读取, 读取程序放弃与写入程序统一
elements[i] = r.elements[(start+i)%len(r.elements)]
}
return elements
}
总结一下环形切片的长处:
- 反对主动滚动更新
- 节俭内存
环形切片能实现固定容量满的状况下旧数据一直被新数据笼罩,因为这个个性能够用于读取 channel 后 n 个元素。
func (s Stream) Tail(n int64) Stream {
if n < 1 {panic("n must be greather than 1")
}
source := make(chan interface{})
go func() {ring := collection.NewRing(int(n))
// 读取全副元素,如果数量 >n 环形切片能实现新数据笼罩旧数据
// 保障获取到的肯定最初 n 个元素
for item := range s.source {ring.Add(item)
}
for _, item := range ring.Take() {source <- item}
close(source)
}()
return Range(source)
}
那么为什么不间接应用 len(source) 长度的切片呢?
答案是节俭内存。但凡波及到环形类型的数据结构时都具备一个长处那就省内存,能做到按需分配资源。
应用示例:
func TestInternalStream_Tail(t *testing.T) {
// 4,5
channel := Just(1, 2, 3, 4, 5).Tail(2).channel()
for item := range channel {t.Log(item)
}
// 1,2,3,4,5
channel2 := Just(1, 2, 3, 4, 5).Tail(6).channel()
for item := range channel2 {t.Log(item)
}
}
元素转换 Map
元素转换,外部由协程实现转换操作,留神输入 channel 并不保障按原序输入。
MapFunc func(intem interface{}) interface{}
func (s Stream) Map(fn MapFunc, opts ...Option) Stream {return s.Walk(func(item interface{}, pip chan<- interface{}) {pip <- fn(item)
}, opts...)
}
应用示例:
func TestInternalStream_Map(t *testing.T) {channel := Just(1, 2, 3, 4, 5, 2, 2, 2, 2, 2, 2).Map(func(item interface{}) interface{} {return item.(int) * 10
}).channel()
for item := range channel {t.Log(item)
}
}
合并 Merge
实现比较简单,我思考了很久没想到有什么场景适宜这个办法。
func (s Stream) Merge() Stream {var items []interface{}
for item := range s.source {items = append(items, item)
}
source := make(chan interface{}, 1)
source <- items
return Range(source)
}
反转 Reverse
反转 channel 中的元素。反转算法流程是:
- 找到两头节点
- 节点两边开始两两替换
留神一下为什么获取 s.source 时用切片来接管呢? 切片会主动扩容,用数组不是更好吗?
其实这里是不能用数组的,因为不晓得 Stream 写入 source 的操作往往是在协程异步写入的,每个 Stream 中的 channel 都可能在动态变化,用流水线来比喻 Stream 工作流程确实十分形象。
func (s Stream) Reverse() Stream {var items []interface{}
for item := range s.source {items = append(items, item)
}
for i := len(items)/2 - 1; i >= 0; i-- {opp := len(items) - 1 - i
items[i], items[opp] = items[opp], items[i]
}
return Just(items...)
}
应用示例:
func TestInternalStream_Reverse(t *testing.T) {channel := Just(1, 2, 3, 4, 5).Reverse().channel()
for item := range channel {t.Log(item)
}
}
排序 Sort
内网调用 slice 官网包的排序计划,传入比拟函数实现比拟逻辑即可。
func (s Stream) Sort(fn LessFunc) Stream {var items []interface{}
for item := range s.source {items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {return fn(i, j)
})
return Just(items...)
}
应用示例:
// 5,4,3,2,1
func TestInternalStream_Sort(t *testing.T) {channel := Just(1, 2, 3, 4, 5).Sort(func(a, b interface{}) bool {return a.(int) > b.(int)
}).channel()
for item := range channel {t.Log(item)
}
}
拼接 Concat
func (s Stream) Concat(steams ...Stream) Stream {
// 创立新的无缓冲 channel
source := make(chan interface{})
go func() {
// 创立一个 waiGroup 对象
group := threading.NewRoutineGroup()
// 异步从原 channel 读取数据
group.Run(func() {
for item := range s.source {source <- item}
})
// 异步读取待拼接 Stream 的 channel 数据
for _, stream := range steams {
// 每个 Stream 开启一个协程
group.Run(func() {for item := range stream.channel() {source <- item}
})
}
// 阻塞期待读取实现
group.Wait()
close(source)
}()
// 返回新的 Stream
return Range(source)
}
汇总 API
全副匹配 AllMatch
func (s Stream) AllMatch(fn PredicateFunc) bool {
for item := range s.source {if !fn(item) {
// 须要排空 s.source,否则后面的 goroutine 可能阻塞
go drain(s.source)
return false
}
}
return true
}
任意匹配 AnyMatch
func (s Stream) AnyMatch(fn PredicateFunc) bool {
for item := range s.source {if fn(item) {
// 须要排空 s.source,否则后面的 goroutine 可能阻塞
go drain(s.source)
return true
}
}
return false
}
一个也不匹配 NoneMatch
func (s Stream) NoneMatch(fn func(item interface{}) bool) bool {
for item := range s.source {if fn(item) {
// 须要排空 s.source,否则后面的 goroutine 可能阻塞
go drain(s.source)
return false
}
}
return true
}
数量统计 Count
func (s Stream) Count() int {
var count int
for range s.source {count++}
return count
}
清空 Done
func (s Stream) Done() {
// 排空 channel,避免 goroutine 阻塞泄露
drain(s.source)
}
迭代全副元素 ForAll
func (s Stream) ForAll(fn ForAllFunc) {fn(s.source)
}
迭代每个元素 ForEach
func (s Stream) ForAll(fn ForAllFunc) {fn(s.source)
}
小结
至此 Stream 组件就全副实现完了,外围逻辑是利用 channel 当做管道,数据当做水流,一直的用协程接管 / 写入数据到 channel 中达到异步非阻塞的成果。
回到开篇提到的问题,未入手前想要实现一个 stream 难度仿佛十分大,很难设想在 go 中 300 多行的代码就能实现如此弱小的组件。
实现高效的根底起源三个语言个性:
- channel
- 协程
- 函数式编程
参考资料
pipeline 模式
切片反转算法
我的项目地址
https://github.com/zeromicro/go-zero
欢送应用 go-zero
并 star 反对咱们!
微信交换群
关注『微服务实际 』公众号并点击 交换群 获取社区群二维码。