连载中…
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-我的项目构建/根底模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-我的项目构建/根底模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
3.1 数据类型定义
KisFlow中能够传递任意类型数据作为Flow的数据源。而且KisFlow反对批量数据的流逝计算解决。
首先须要对KisFlow中外部反对的数据类型做一个根本的定义,咱们将这部分的定义代码写在kis-flow/common/
中的data_type.go
文件中。
kis-flow/common/data_type.go
package common
// KisRow 一行数据
type KisRow interface{}
// KisRowArr 一次业务的批量数据
type KisRowArr []KisRow
/*
KisDataMap 以后Flow承载的全副数据,
key : 数据所在的Function ID
value: 对应的KisRow
*/
type KisDataMap map[string]KisRowArr
KisRow
:示意一行数据,能够是任意的数据类型,比方字符串,json字符串,一些序列化的二进制数据, protobuf,yaml字符串等,均可。KisRowArr
:示意多行数据,也就是一次提交的批量数据,他是KisRow的数组汇合。KisDataMap
:示意以后Flow承载的全副数据。是一个map[string]KisRowArr
类型,其中key为数据所在的Function ID,value为数据。
3.2 KisFlow数据流解决
在KisFlow模块中,新增一些存放数据的成员,如下:
kis-flow/flow/kis_flow.go
// KisFlow 用于贯通整条流式计算的上下文环境
type KisFlow struct {
// 根底信息
Id string // Flow的分布式实例ID(用于KisFlow外部辨别不同实例)
Name string // Flow的可读名称
Conf *config.KisFlowConfig // Flow配置策略
// Function列表
Funcs map[string]kis.Function // 以后flow领有的全副治理的全副Function对象, key: FunctionID
FlowHead kis.Function // 以后Flow所领有的Function列表表头
FlowTail kis.Function // 以后Flow所领有的Function列表表尾
flock sync.RWMutex // 治理链表插入读写的锁
ThisFunction kis.Function // Flow以后正在执行的KisFunction对象
ThisFunctionId string // 以后执行到的Function ID (策略配置ID)
PrevFunctionId string // 以后执行到的Function 上一层FunctionID(策略配置ID)
// Function列表参数
funcParams map[string]config.FParam // flow在以后Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
fplock sync.RWMutex // 治理funcParams的读写锁
// ++++++++ 数据 ++++++++++
buffer common.KisRowArr // 用来长期寄存输出字节数据的外部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
data common.KisDataMap // 流式计算各个层级的数据源
inPut common.KisRowArr // 以后Function的计算输出数据
}
buffer
: 用来长期寄存输出字节数据的外部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatchdata
: 流式计算各个层级的数据源inPut
: 以后Function的计算输出数据
后续章节会应用到这几个成员属性,这里先做为理解。
因为data是一个map
类型,所以须要在NewKisFlow()
中,对其进行初始化操作:
kis-flow/flow/kis_flow.go
// NewKisFlow 创立一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// 实例Id
flow.Id = id.KisID(common.KisIdTypeFlow)
// 根底信息
flow.Name = conf.FlowName
flow.Conf = conf
// Function列表
flow.Funcs = make(map[string]kis.Function)
flow.funcParams = make(map[string]config.FParam)
// ++++++++ 数据data +++++++
flow.data = make(common.KisDataMap)
return flow
}
3.2.2 业务提交数据接口
KisFlow的开发者在编写业务时,能够通过flow实例来进行提交业务源数据,所以咱们须要给Flow
形象层新增一个提交数据的接口:
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run 调度Flow,顺次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function依照配置文件中的配置进行连贯
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow ++++++ 提交Flow数据到行将执行的Function层 ++++
CommitRow(row interface{}) error
}
新增接口 CommitRow(any interface{}) error
。
在kis-flow/flow/kis_flow_data.go
中实现KisFlow的该接口。
kis-flow/flow/kis_flow_data.go
func (flow *KisFlow) CommitRow(row interface{}) error {
flow.buffer = append(flow.buffer, row)
return nil
}
CommitRow()
为提交Flow数据, 一行数据,如果是批量数据能够提交屡次。 所有提交的数据都会暂存在flow.buffer
成员中,作为缓冲区。
3.2.3 KisFlow外部数据提交
当初开发者能够通过CommitRow()
将数据提交到buffer中,然而在KisFlow外部须要一个外部接口来将buffer提交到KisFlow的data中,作为之后以后Flow全副Function的上下文数据供应用。所以咱们这里须要再提供两个接口。别离是首次提交数据commitSrcData()
和中间层提交数据commitCurData()
两个函数。
A. 首层数据提交
kis-flow/flow/kis_flow_data.go
// commitSrcData 提交以后Flow的数据源数据, 示意首次提交以后Flow的原始数据源
// 将flow的长期数据buffer,提交到flow的data中,(data为各个Function层级的源数据备份)
// 会清空之前所有的flow数据
func (flow *KisFlow) commitSrcData(ctx context.Context) error {
// 制作批量数据batch
dataCnt := len(flow.buffer)
batch := make(common.KisRowArr, 0, dataCnt)
for _, row := range flow.buffer {
batch = append(batch, row)
}
// 清空之前所有数据
flow.clearData(flow.data)
// 首次提交,记录flow原始数据
// 因为首次提交,所以PrevFunctionId为FirstVirtual 因为没有上一层Function
flow.data[common.FunctionIdFirstVirtual] = batch
// 清空缓冲Buf
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
//ClearData 清空flow所有数据
func (flow *KisFlow) clearData(data common.KisDataMap) {
for k := range data {
delete(data, k)
}
}
实际上commitSrcData()
在整个的Flow
运行周期只会执行一次,这个作为以后Flow
的始祖源数据。
commitSrcData()
的最终目标是 将buffer的数据提交到data[FunctionIdFirstVirtual]
中。 这里要留神的是FunctionIdFirstVirtual
是一个虚构fid,作为所有Function
的上游Function ID。 并且首次提交之后,flow.buffer
的数据将被清空。
B. 中间层数据提交
kis-flow/flow/kis_flow_data.go
//commitCurData 提交Flow以后执行Function的后果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {
//判断本层计算是否有后果数据,如果没有则退出本次Flow Run循环
if len(flow.buffer) == 0 {
return nil
}
// 制作批量数据batch
batch := make(common.KisRowArr, 0, len(flow.buffer))
//如果strBuf为空,则没有增加任何数据
for _, row := range flow.buffer {
batch = append(batch, row)
}
//将本层计算的缓冲数据提交到本层后果数据中
flow.data[flow.ThisFunctionId] = batch
//清空缓冲Buf
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
commitCurData()
会在每次Function执行计算后,将以后Function的计算结果数据进行提交。 commitCurData() 会在Flow的流式计算过程中被执行屡次。
commitCurData()
的最终目标是将将buffer的数据提交到data[flow.ThisFunctionId]
中 。ThisFunctionId也就是以后正在执行Function,同时也是下一层将要执行的Function的上一层。
提交之后,flow.buffer
的数据将被清空。
3.2.4 获取正在执行Function的源数据
至于每层Function的源数据如何失去,咱们能够通过getCurData()
办法失去。 通过PrevFunctionId
进行索引,因为获取以后Function的源数据,就是上一层Function的后果数据,所以咱们通过PrevFunctionId
来失去上一层Function的Id,从data[PrevFunctionId
] 中能够失去数据源。
kis-flow/flow/kis_flow_data.go
// getCurData 获取flow以后Function层级的输出数据
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
if flow.PrevFunctionId == "" {
return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
}
if _, ok := flow.data[flow.PrevFunctionId]; !ok {
return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
}
return flow.data[flow.PrevFunctionId], nil
}
3.2.5 数据流链式调度解决
上面咱们就要在flow.Run()
办法中,来退出数据流的解决动作。
kis-flow/flow/kis_flow.go
// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function
fn = flow.FlowHead
if flow.Conf.Status == int(common.FlowDisable) {
//flow被配置敞开
return nil
}
// ========= 数据流 新增 ===========
// 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
flow.PrevFunctionId = common.FunctionIdFirstVirtual
// 提交数据流原始数据
if err := flow.commitSrcData(ctx); err != nil {
return err
}
// ========= 数据流 新增 ===========
//流式链式调用
for fn != nil {
// ========= 数据流 新增 ===========
// flow记录以后执行到的Function 标记
fid := fn.GetId()
flow.ThisFunction = fn
flow.ThisFunctionId = fid
// 失去以后Function要解决与的源数据
if inputData, err := flow.getCurData(); err != nil {
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
return err
} else {
flow.inPut = inputData
}
// ========= 数据流 新增 ===========
if err := fn.Call(ctx, flow); err != nil {
//Error
return err
} else {
//Success
// ========= 数据流 新增 ===========
if err := flow.commitCurData(ctx); err != nil {
return err
}
// 更新上一层FuncitonId游标
flow.PrevFunctionId = flow.ThisFunctionId
// ========= 数据流 新增 ===========
fn = fn.Next()
}
}
return nil
}
- 在run() 刚执行的时候,对PrevFunctionId 进行初始化,设置为
FunctionIdFirstVirtual
。 - 在run() 刚执行的时候,执行
commitSrcData()
将业务赋值的的buffer数据提交到data[FunctionIdFirstVirtual
]中。 - 进入循环,执行每个Function的时候,
getCurData()
获取到以后Function的源数据,并且放在flow.inPut
成员中。 - 进入循环,更正
ThisFunctionId
游标为以后Function ID。 - 进入循环,每个Funciton执行结束后,将Function产生的后果数据通过
commitCurData()
进行提交,并且扭转PrevFunctionId
为以后FunctionID, 进入下一层。
很显然,咱们还须要让Flow
给开发者提供一个获取Input数据的接口。
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run 调度Flow,顺次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function依照配置文件中的配置进行连贯
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到行将执行的Function层
CommitRow(row interface{}) error
// ++++++++++++++++++++++
// Input 失去flow以后执行Function的输出源数据
Input() common.KisRowArr
}
实现如下:
kis-flow/flow/kis_flow_data.go
// Input 失去flow以后执行Function的输出源数据
func (flow *KisFlow) Input() common.KisRowArr {
return flow.inPut
}
3.3 KisFunction的数据流解决
因为咱们的Function调度模块还目前还没有实现,所以无关Function在执行Call()
办法的时候,只能临时将业务计算的逻辑写死在KisFlow框架中。 在下一章节,咱们会将这部分的计算逻辑凋谢给开发者进行注册本人的业务。
当初Flow曾经将数据传递给了每层的Function,那么在Function中咱们上面来简略模仿一下业务的根底计算逻辑。
咱们临时批改KisFunctionC
和 KisFunctionE
两个模块的Call()
代码.
假如KisFunctionC 是 KisFunctionE的下层。
kis-flow/function/kis_function_c.go
type KisFunctionC struct {
BaseFunction
}
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)
//TODO 调用具体的Function执行办法
//解决业务数据
for i, row := range flow.Input() {
fmt.Printf("In KisFunctionC, row = %+v\n", row)
// 提交本层计算结果数据
_ = flow.CommitRow("Data From KisFunctionC, index " + " " + fmt.Sprintf("%d", i))
}
return nil
}
kis-flow/function/kis_function_e.go
type KisFunctionE struct {
BaseFunction
}
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)
// TODO 调用具体的Function执行办法
//解决业务数据
for _, row := range flow.Input() {
fmt.Printf("In KisFunctionE, row = %+v\n", row)
}
return nil
}
3.4 数据流单元测试
上面咱们模仿一个简略的计算业务,测试下每层的Function是否能够失去数据,并且将计算结果传递给下一层。
kis-flow/test/kis_flow_test.go
func TestNewKisFlowData(t *testing.T) {
ctx := context.Background()
// 1. 创立2个KisFunction配置实例
source1 := config.KisSource{
Name: "公众号抖音商城户订单数据",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "用户订单错误率",
Must: []string{"order_id", "user_id"},
}
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}
myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
if myFuncConfig2 == nil {
panic("myFuncConfig2 is nil")
}
// 2. 创立一个 KisFlow 配置实例
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. 创立一个KisFlow对象
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 拼接Functioin 到 Flow 上
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig2, nil); err != nil {
panic(err)
}
// 5. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 6. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
这里咱们通过flow.CommitRow()
提交了3行数据,每行数据是一个字符串,当然数据格式能够任意,数据类型也能够任意,只须要在各层的Function业务本身确定拉齐好即可。
cd到kis-flow/test/
下执行命令:
go test -test.v -test.paniconexit0 -test.run TestNewKisFlowData
后果如下:
=== RUN TestNewKisFlowData
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionC, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000138190 ThisFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
In KisFunctionC, row = This is Data1 from Test
In KisFunctionC, row = This is Data2 from Test
In KisFunctionC, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]]
KisFunctionE, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001381e0 ThisFunctionId:func-2182fa1a049f4c1c9eeb641f5292f09f PrevFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]] inPut:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]}
In KisFunctionE, row = Data From KisFunctionC, index 0
In KisFunctionE, row = Data From KisFunctionC, index 1
In KisFunctionE, row = Data From KisFunctionC, index 2
--- PASS: TestNewKisFlowData (0.00s)
PASS
ok kis-flow/test 0.636s
通过日志的具体校验,后果是合乎咱们预期的。
好了,目前数据流的最简略版本曾经实现了,下一章咱们将Function的业务逻辑凋谢给开发者,而不是写在KisFlow框架中.
3.5 【V0.2】源代码
https://github.com/aceld/kis-flow/releases/tag/v0.2
作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源我的项目地址:https://github.com/aceld/kis-flow
连载中…
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-我的项目构建/根底模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-我的项目构建/根底模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
发表回复