乐趣区

关于go:Golang框架实战KisFlow流式计算框架4数据流

连载中 …
Golang 框架实战 -KisFlow 流式计算框架(1)- 概述
Golang 框架实战 -KisFlow 流式计算框架(2)- 我的项目构建 / 根底模块 -(上)
Golang 框架实战 -KisFlow 流式计算框架(3)- 我的项目构建 / 根底模块 -(下)
Golang 框架实战 -KisFlow 流式计算框架(4)- 数据流


3.1 数据类型定义

KisFlow 中能够传递任意类型数据作为 Flow 的数据源。而且 KisFlow 反对批量数据的流逝计算解决。

首先须要对 KisFlow 中外部反对的数据类型做一个根本的定义,咱们将这部分的定义代码写在 kis-flow/common/ 中的data_type.go 文件中。

kis-flow/common/data_type.go

package common

// KisRow 一行数据
type KisRow interface{}

// KisRowArr 一次业务的批量数据
type KisRowArr []KisRow

/*
    KisDataMap 以后 Flow 承载的全副数据,key    :  数据所在的 Function ID
    value: 对应的 KisRow
*/
type KisDataMap map[string]KisRowArr
  • KisRow:示意一行数据,能够是任意的数据类型,比方字符串,json 字符串,一些序列化的二进制数据,protobuf,yaml 字符串等,均可。
  • KisRowArr:示意多行数据,也就是一次提交的批量数据,他是 KisRow 的数组汇合。
  • KisDataMap:示意以后 Flow 承载的全副数据。是一个 map[string]KisRowArr 类型,其中 key 为数据所在的 Function ID,value 为数据。

3.2 KisFlow 数据流解决

在 KisFlow 模块中,新增一些存放数据的成员,如下:

kis-flow/flow/kis_flow.go

// KisFlow 用于贯通整条流式计算的上下文环境
type KisFlow struct {
    // 根底信息
    Id   string                // Flow 的分布式实例 ID(用于 KisFlow 外部辨别不同实例)
    Name string                // Flow 的可读名称
    Conf *config.KisFlowConfig // Flow 配置策略

    // Function 列表
    Funcs          map[string]kis.Function // 以后 flow 领有的全副治理的全副 Function 对象, key: FunctionID
    FlowHead       kis.Function            // 以后 Flow 所领有的 Function 列表表头
    FlowTail       kis.Function            // 以后 Flow 所领有的 Function 列表表尾
    flock          sync.RWMutex            // 治理链表插入读写的锁
    ThisFunction   kis.Function            // Flow 以后正在执行的 KisFunction 对象
    ThisFunctionId string                  // 以后执行到的 Function ID (策略配置 ID)
    PrevFunctionId string                  // 以后执行到的 Function 上一层 FunctionID(策略配置 ID)

    // Function 列表参数
    funcParams map[string]config.FParam // flow 在以后 Function 的自定义固定配置参数,Key:function 的实例 KisID, value:FParam
    fplock     sync.RWMutex             // 治理 funcParams 的读写锁

    // ++++++++ 数据 ++++++++++
    buffer common.KisRowArr  // 用来长期寄存输出字节数据的外部 Buf, 一条数据为 interface{}, 多条数据为[]interface{} 也就是 KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 以后 Function 的计算输出数据
}
  • buffer: 用来长期寄存输出字节数据的外部 Buf, 一条数据为 interface{}, 多条数据为[]interface{} 也就是 KisBatch
  • data: 流式计算各个层级的数据源
  • inPut: 以后 Function 的计算输出数据

后续章节会应用到这几个成员属性,这里先做为理解。

因为 data 是一个 map 类型,所以须要在 NewKisFlow() 中,对其进行初始化操作:

kis-flow/flow/kis_flow.go

// NewKisFlow 创立一个 KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {flow := new(KisFlow)
    // 实例 Id
    flow.Id = id.KisID(common.KisIdTypeFlow)

    // 根底信息
    flow.Name = conf.FlowName
    flow.Conf = conf

    // Function 列表
    flow.Funcs = make(map[string]kis.Function)
    flow.funcParams = make(map[string]config.FParam)

    // ++++++++ 数据 data +++++++
    flow.data = make(common.KisDataMap)

    return flow
}

3.2.2 业务提交数据接口

KisFlow 的开发者在编写业务时,能够通过 flow 实例来进行提交业务源数据,所以咱们须要给 Flow 形象层新增一个提交数据的接口:

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run 调度 Flow,顺次调度 Flow 中的 Function 并且执行
    Run(ctx context.Context) error
    // Link 将 Flow 中的 Function 依照配置文件中的配置进行连贯
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow  ++++++ 提交 Flow 数据到行将执行的 Function 层 ++++
    CommitRow(row interface{}) error
}

新增接口 CommitRow(any interface{}) error

kis-flow/flow/kis_flow_data.go 中实现 KisFlow 的该接口。

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) CommitRow(row interface{}) error {flow.buffer = append(flow.buffer, row)

    return nil
}

CommitRow() 为提交 Flow 数据, 一行数据,如果是批量数据能够提交屡次。所有提交的数据都会暂存在flow.buffer 成员中,作为缓冲区。

3.2.3 KisFlow 外部数据提交

当初开发者能够通过 CommitRow() 将数据提交到 buffer 中,然而在 KisFlow 外部须要一个外部接口来将 buffer 提交到 KisFlow 的 data 中,作为之后以后 Flow 全副 Function 的上下文数据供应用。所以咱们这里须要再提供两个接口。别离是首次提交数据 commitSrcData() 和中间层提交数据 commitCurData() 两个函数。

A. 首层数据提交

kis-flow/flow/kis_flow_data.go

// commitSrcData 提交以后 Flow 的数据源数据, 示意首次提交以后 Flow 的原始数据源
// 将 flow 的长期数据 buffer,提交到 flow 的 data 中,(data 为各个 Function 层级的源数据备份)
// 会清空之前所有的 flow 数据
func (flow *KisFlow) commitSrcData(ctx context.Context) error {

    // 制作批量数据 batch
    dataCnt := len(flow.buffer)
    batch := make(common.KisRowArr, 0, dataCnt)

    for _, row := range flow.buffer {batch = append(batch, row)
    }

    // 清空之前所有数据
    flow.clearData(flow.data)

    // 首次提交,记录 flow 原始数据
    // 因为首次提交,所以 PrevFunctionId 为 FirstVirtual 因为没有上一层 Function
    flow.data[common.FunctionIdFirstVirtual] = batch

    // 清空缓冲 Buf
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

//ClearData 清空 flow 所有数据
func (flow *KisFlow) clearData(data common.KisDataMap) {
    for k := range data {delete(data, k)
    }
}

实际上 commitSrcData() 在整个的 Flow 运行周期只会执行一次,这个作为以后 Flow 的始祖源数据。

commitSrcData() 的最终目标是 将 buffer 的数据提交到 data[FunctionIdFirstVirtual] 中。这里要留神的是FunctionIdFirstVirtual 是一个虚构 fid,作为所有 Function 的上游 Function ID。并且首次提交之后,flow.buffer的数据将被清空。

B. 中间层数据提交

kis-flow/flow/kis_flow_data.go

//commitCurData 提交 Flow 以后执行 Function 的后果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {

    // 判断本层计算是否有后果数据, 如果没有则退出本次 Flow Run 循环
    if len(flow.buffer) == 0 {return nil}

    // 制作批量数据 batch
    batch := make(common.KisRowArr, 0, len(flow.buffer))

    // 如果 strBuf 为空,则没有增加任何数据
    for _, row := range flow.buffer {batch = append(batch, row)
    }

    // 将本层计算的缓冲数据提交到本层后果数据中
    flow.data[flow.ThisFunctionId] = batch

    // 清空缓冲 Buf
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, "====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

commitCurData()会在每次 Function 执行计算后,将以后 Function 的计算结果数据进行提交。commitCurData() 会在 Flow 的流式计算过程中被执行屡次。

commitCurData()的最终目标是将将 buffer 的数据提交到data[flow.ThisFunctionId] 中。ThisFunctionId 也就是以后正在执行 Function,同时也是下一层将要执行的 Function 的上一层。

提交之后,flow.buffer的数据将被清空。

3.2.4 获取正在执行 Function 的源数据

至于每层 Function 的源数据如何失去,咱们能够通过 getCurData() 办法失去。通过 PrevFunctionId 进行索引,因为获取以后 Function 的源数据,就是上一层 Function 的后果数据,所以咱们通过 PrevFunctionId 来失去上一层 Function 的 Id,从 data[PrevFunctionId] 中能够失去数据源。

kis-flow/flow/kis_flow_data.go

// getCurData 获取 flow 以后 Function 层级的输出数据
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
    if flow.PrevFunctionId == "" {return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
    }

    if _, ok := flow.data[flow.PrevFunctionId]; !ok {return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
    }

    return flow.data[flow.PrevFunctionId], nil
}

3.2.5 数据流链式调度解决

上面咱们就要在 flow.Run() 办法中,来退出数据流的解决动作。

kis-flow/flow/kis_flow.go

// Run 启动 KisFlow 的流式计算, 从起始 Function 开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead

    if flow.Conf.Status == int(common.FlowDisable) {
        //flow 被配置敞开
        return nil
    }

    // ========= 数据流 新增 ===========
    // 因为此时还没有执行任何 Function, 所以 PrevFunctionId 为 FirstVirtual 因为没有上一层 Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // 提交数据流原始数据
    if err := flow.commitSrcData(ctx); err != nil {return err}
    // ========= 数据流 新增 ===========


    // 流式链式调用
    for fn != nil {

        // ========= 数据流 新增 ===========
        // flow 记录以后执行到的 Function 标记
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // 失去以后 Function 要解决与的源数据
        if inputData, err := flow.getCurData(); err != nil {log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {flow.inPut = inputData}
        // ========= 数据流 新增 ===========


        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success

            // ========= 数据流 新增 ===========
            if err := flow.commitCurData(ctx); err != nil {return err}

            // 更新上一层 FuncitonId 游标
            flow.PrevFunctionId = flow.ThisFunctionId
            // ========= 数据流 新增 ===========

            fn = fn.Next()}
    }

    return nil
}
  • 在 run() 刚执行的时候,对 PrevFunctionId 进行初始化,设置为 FunctionIdFirstVirtual
  • 在 run() 刚执行的时候,执行 commitSrcData() 将业务赋值的的 buffer 数据提交到 data[FunctionIdFirstVirtual]中。
  • 进入循环,执行每个 Function 的时候,getCurData()获取到以后 Function 的源数据,并且放在flow.inPut 成员中。
  • 进入循环,更正ThisFunctionId 游标为以后 Function ID。
  • 进入循环,每个 Funciton 执行结束后,将 Function 产生的后果数据通过 commitCurData() 进行提交,并且扭转 PrevFunctionId 为以后 FunctionID,进入下一层。

很显然,咱们还须要让 Flow 给开发者提供一个获取 Input 数据的接口。

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run 调度 Flow,顺次调度 Flow 中的 Function 并且执行
    Run(ctx context.Context) error
    // Link 将 Flow 中的 Function 依照配置文件中的配置进行连贯
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交 Flow 数据到行将执行的 Function 层
    CommitRow(row interface{}) error

    // ++++++++++++++++++++++
    // Input 失去 flow 以后执行 Function 的输出源数据
    Input() common.KisRowArr}

实现如下:

kis-flow/flow/kis_flow_data.go

// Input 失去 flow 以后执行 Function 的输出源数据
func (flow *KisFlow) Input() common.KisRowArr {return flow.inPut}

3.3 KisFunction 的数据流解决

因为咱们的 Function 调度模块还目前还没有实现,所以无关 Function 在执行 Call() 办法的时候,只能临时将业务计算的逻辑写死在 KisFlow 框架中。在下一章节,咱们会将这部分的计算逻辑凋谢给开发者进行注册本人的业务。

当初 Flow 曾经将数据传递给了每层的 Function,那么在 Function 中咱们上面来简略模仿一下业务的根底计算逻辑。

咱们临时批改 KisFunctionCKisFunctionE 两个模块的 Call() 代码.
假如 KisFunctionC 是 KisFunctionE 的下层。

kis-flow/function/kis_function_c.go

type KisFunctionC struct {BaseFunction}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)

    //TODO 调用具体的 Function 执行办法
    // 解决业务数据
    for i, row := range flow.Input() {fmt.Printf("In KisFunctionC, row = %+v\n", row)

        // 提交本层计算结果数据
        _ = flow.CommitRow("Data From KisFunctionC, index" + "" + fmt.Sprintf("%d", i))
    }

    return nil
}

kis-flow/function/kis_function_e.go

type KisFunctionE struct {BaseFunction}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)

    // TODO 调用具体的 Function 执行办法
    // 解决业务数据
    for _, row := range flow.Input() {fmt.Printf("In KisFunctionE, row = %+v\n", row)
    }

    return nil
}

3.4 数据流单元测试

上面咱们模仿一个简略的计算业务,测试下每层的 Function 是否能够失去数据,并且将计算结果传递给下一层。

kis-flow/test/kis_flow_test.go

func TestNewKisFlowData(t *testing.T) {ctx := context.Background()

    // 1. 创立 2 个 KisFunction 配置实例
    source1 := config.KisSource{
        Name: "公众号抖音商城户订单数据",
        Must: []string{"order_id", "user_id"},
    }

    source2 := config.KisSource{
        Name: "用户订单错误率",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
    if myFuncConfig1 == nil {panic("myFuncConfig1 is nil")
    }

    myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
    if myFuncConfig2 == nil {panic("myFuncConfig2 is nil")
    }

    // 2. 创立一个 KisFlow 配置实例
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. 创立一个 KisFlow 对象
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. 拼接 Functioin 到 Flow 上
    if err := flow1.Link(myFuncConfig1, nil); err != nil {panic(err)
    }
    if err := flow1.Link(myFuncConfig2, nil); err != nil {panic(err)
    }

    // 5. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 6. 执行 flow1
    if err := flow1.Run(ctx); err != nil {panic(err)
    }
}

这里咱们通过 flow.CommitRow() 提交了 3 行数据,每行数据是一个字符串,当然数据格式能够任意,数据类型也能够任意,只须要在各层的 Function 业务本身确定拉齐好即可。

cd 到 kis-flow/test/ 下执行命令:

go test -test.v -test.paniconexit0 -test.run TestNewKisFlowData

后果如下:

=== RUN   TestNewKisFlowData
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000138190 ThisFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

In KisFunctionC, row = This is Data1 from Test
In KisFunctionC, row = This is Data2 from Test
In KisFunctionC, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]]

KisFunctionE, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001381e0 ThisFunctionId:func-2182fa1a049f4c1c9eeb641f5292f09f PrevFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]] inPut:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]}

In KisFunctionE, row = Data From KisFunctionC, index  0
In KisFunctionE, row = Data From KisFunctionC, index  1
In KisFunctionE, row = Data From KisFunctionC, index  2
--- PASS: TestNewKisFlowData (0.00s)
PASS
ok      kis-flow/test   0.636s

通过日志的具体校验,后果是合乎咱们预期的。

好了,目前数据流的最简略版本曾经实现了,下一章咱们将 Function 的业务逻辑凋谢给开发者,而不是写在 KisFlow 框架中.

3.5【V0.2】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.2


作者: 刘丹冰 Aceld github: https://github.com/aceld
KisFlow 开源我的项目地址:https://github.com/aceld/kis-flow


连载中 …
Golang 框架实战 -KisFlow 流式计算框架(1)- 概述
Golang 框架实战 -KisFlow 流式计算框架(2)- 我的项目构建 / 根底模块 -(上)
Golang 框架实战 -KisFlow 流式计算框架(3)- 我的项目构建 / 根底模块 -(下)
Golang 框架实战 -KisFlow 流式计算框架(4)- 数据流

退出移动版