关于go:Golang框架实战KisFlow流式计算框架5Function调度

3次阅读

共计 11439 个字符,预计需要花费 29 分钟才能阅读完成。

连载中 …
Golang 框架实战 -KisFlow 流式计算框架(1)- 概述
Golang 框架实战 -KisFlow 流式计算框架(2)- 我的项目构建 / 根底模块 -(上)
Golang 框架实战 -KisFlow 流式计算框架(3)- 我的项目构建 / 根底模块 -(下)
Golang 框架实战 -KisFlow 流式计算框架(4)- 数据流
Golang 框架实战 -KisFlow 流式计算框架(5)-Function 调度

4.1 Router

当初,将 KisFlow 提供对外 Function 开放注册能力,首先咱们要定义一些注册函数原型,和治理这些 Function 的 Router 映射关系类型。

创立kis-flow/kis/router.go,定义原型如下:

kis-flow/kis/router.go

package kis

import "context"

// FaaS Function as a Service
type FaaS func(context.Context, Flow) error

// funcRouter
// key: Function Name
// value: Function 回调自定义业务
type funcRouter map[string]FaaS

// flowRouter
// key: Flow Name
// value: Flow
type flowRouter map[string]Flow

FaaS:是开发者给 KisFlow 注册的 Function 回调业务函数原型,须要传递两个参数,Context 和 Flow,Context 次要承载业务的上线文环境,Flow 次要承载 KisFlow 的上下文环境,咱们能够通过 Flow 获取以后 Function 的配置信息,以后 Function 的数据信息,曾经 Flow 上其余节点的 Function 相干信息等。

funcRouter: 治理 FunctionName 和 FaaS 业务回调的映射关系 Map,是一个公有类型,不对外提供援用。须要留神的是,funcRouter 的 key 是 FunctionName,因为 FunctionId 是生成的随机 Id,开发者在注册路由的时候,并无奈预判和可读,所以关联的业务回调是与 FunctionName 做的映射关系。

flowRouter: 治理 FlowName 和 Flow 实例的映射关系 Map,是一个公有类型,不对外提供援用。flowRouter仍然是 FlowName 的映射关系。

4.2 KisPool

KisFlow 提供一个用来治理全副全局映射关系的类KisPool,KisPool 蕴含 Router,且提供对 Router 的治理能力。

4.2.1 KisPool 的定义

创立 kis-flow/kis/pool.go 文件,来创立 kis_pool 模块。

kis-flow/kis/pool.go

package kis

import (
    "context"
    "errors"
    "fmt"
    "kis-flow/log"
    "sync"
)

var _poolOnce sync.Once

//  kisPool 用于治理全副的 Function 和 Flow 配置的池子
type kisPool struct {
    fnRouter funcRouter   // 全副的 Function 治理路由
    fnLock   sync.RWMutex // fnRouter 锁

    flowRouter flowRouter   // 全副的 flow 对象
    flowLock   sync.RWMutex // flowRouter 锁
}

// 单例
var _pool *kisPool

// Pool 单例结构
func Pool() *kisPool {_poolOnce.Do(func() {
        // 创立 kisPool 对象
        _pool = new(kisPool)

        // fnRouter 初始化
        _pool.fnRouter = make(funcRouter)

        // flowRouter 初始化
        _pool.flowRouter = make(flowRouter)
    })

    return _pool
}

kis_pool采纳单例模式,Pool()办法为获取以后的单例,无关 fnRouterflowRouter在生命周期只会初始化一次,通过 sync.Once 来管制。

4.2.2 注册及获取 Flow

KisPool 能够提供增加和获取 Flow 信息的接口,如下:

kis-flow/kis/pool.go

func (pool *kisPool) AddFlow(name string, flow Flow) {pool.flowLock.Lock()
    defer pool.flowLock.Unlock()

    if _, ok := pool.flowRouter[name]; !ok {pool.flowRouter[name] = flow
    } else {errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name)
        panic(errString)
    }

    log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name)
}

func (pool *kisPool) GetFlow(name string) Flow {pool.flowLock.RLock()
    defer pool.flowLock.RUnlock()

    if flow, ok := pool.flowRouter[name]; ok {return flow} else {return nil}
}

AddFlow 会依据雷同的 FlowName 进行做反复校验,雷同的 Flow 无奈注册屡次。

4.2.3 注册及调度 Function

KisPool 提供注册 Funciton 回调和调度 Funciton 办法,如下。

kis-flow/kis/pool.go

// FaaS 注册 Function 计算业务逻辑, 通过 Function Name 索引及注册
func (pool *kisPool) FaaS(fnName string, f FaaS) {pool.fnLock.Lock()
    defer pool.fnLock.Unlock()

    if _, ok := pool.fnRouter[fnName]; !ok {pool.fnRouter[fnName] = f
    } else {errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}

// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {if f, ok := pool.fnRouter[fnName]; ok {return f(ctx, flow)
    }

    log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

    return errors.New("FuncName:" + fnName + "Can not find in NsPool, Not Added.")
}

CallFunction() 中,须要传递参数 Flow,作为数据流调度的上下文环境。开发者在自定义 FaaS 中能够通过 Flow 来获取一些 Funciton 信息,所以这里须要给 Flow 补充几个获取配置信息的接口,之后如果再须要,再持续补充,具体如下:

kis-flow/kis/flow.go

type Flow interface {
    // Run 调度 Flow,顺次调度 Flow 中的 Function 并且执行
    Run(ctx context.Context) error
    // Link 将 Flow 中的 Function 依照配置文件中的配置进行连贯
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交 Flow 数据到行将执行的 Function 层
    CommitRow(row interface{}) error
    // Input 失去 flow 以后执行 Function 的输出源数据
    Input() common.KisRowArr

    // ++++++++++++++++++++++++++++++++++
    // GetName 失去 Flow 的名称
    GetName() string
    // GetThisFunction 失去以后正在执行的 Function
    GetThisFunction() Function
    // GetThisFuncConf 失去以后正在执行的 Function 的配置
    GetThisFuncConf() *config.KisFuncConfig}

kis-flow/flow/kis_flow.go

func (flow *KisFlow) GetName() string {return flow.Name}

func (flow *KisFlow) GetThisFunction() kis.Function {return flow.ThisFunction}

func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {return flow.ThisFunction.GetConfig()
}

4.3 KisFunction 援用 KisPool 调度

当初,咱们就能够在 KisFunctionX 中的 Call() 来通过 Pool 进行调度了,顺次批改每个 Function 的 Call()办法。

kis-flow/function/kis_function_c.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionC struct {BaseFunction}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)

    // 通过 KisPool 路由到具体的执行计算 Function 中
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}

kis-flow/function/kis_function_e.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionE struct {BaseFunction}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)

    // 通过 KisPool 路由到具体的执行计算 Function 中
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}

kis-flow/function/kis_function_l.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionL struct {BaseFunction}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow)

    // 通过 KisPool 路由到具体的执行计算 Function 中
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}

kis-flow/function/kis_function_s.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionS struct {BaseFunction}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow)

    // 通过 KisPool 路由到具体的执行计算 Function 中
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}

kis-flow/function/kis_function_v.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionV struct {BaseFunction}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow)

    // 通过 KisPool 路由到具体的执行计算 Function 中
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}

4.4 KisPool 单元测试

接下来咱们来针对 KisPool 进行单元测试。

4.4.1 自定义 FaaS

kis-flow/test/kis_pool_test.go

package test

import (
    "context"
    "fmt"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/flow"
    "kis-flow/kis"
    "testing"
)

func funcName1Handler(ctx context.Context, flow kis.Flow) error {fmt.Println("---> Call funcName1Handler ----")

    for index, row := range flow.Input() {
        // 打印数据
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // 计算结果数据
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交后果数据
        _ = flow.CommitRow(resultStr)
    }

    return nil
}

func funcName2Handler(ctx context.Context, flow kis.Flow) error {for _, row := range flow.Input() {str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}

4.4.2 注册 FaaS 及启动 Flow

kis-flow/test/kis_pool_test.go

func TestNewKisPool(t *testing.T) {ctx := context.Background()

    // 0. 注册 Function
    kis.Pool().FaaS("funcName1", funcName1Handler)
    kis.Pool().FaaS("funcName2", funcName2Handler)

    // 1. 创立 2 个 KisFunction 配置实例
    source1 := config.KisSource{
        Name: "公众号抖音商城户订单数据",
        Must: []string{"order_id", "user_id"},
    }

    source2 := config.KisSource{
        Name: "用户订单错误率",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
    if myFuncConfig1 == nil {panic("myFuncConfig1 is nil")
    }

    myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
    if myFuncConfig2 == nil {panic("myFuncConfig2 is nil")
    }

    // 2. 创立一个 KisFlow 配置实例
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. 创立一个 KisFlow 对象
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. 拼接 Functioin 到 Flow 上
    if err := flow1.Link(myFuncConfig1, nil); err != nil {panic(err)
    }
    if err := flow1.Link(myFuncConfig2, nil); err != nil {panic(err)
    }

    // 5. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 6. 执行 flow1
    if err := flow1.Run(ctx); err != nil {panic(err)
    }
}

cd 到 kis-flow/test/ 下执行命令:

go test -test.v -test.paniconexit0 -test.run TestNewKisPool

后果如下:

=== RUN   TestNewKisPool
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c0190 ThisFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionE, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c01e0 ThisFunctionId:func-9cd2ab870b384794b312d2be10bb06fa PrevFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 2
--- PASS: TestNewKisPool (0.00s)
PASS
ok      kis-flow/test   0.520s

通过日志的具体校验,后果是合乎咱们预期的。

好了,当初 Function 的业务能力曾经凋谢给开发者了,接下来咱们来持续欠缺 KisFlow 的能力。

4.5【V0.3】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.3


作者: 刘丹冰 Aceld github: https://github.com/aceld
KisFlow 开源我的项目地址:https://github.com/aceld/kis-flow


Golang 框架实战 -KisFlow 流式计算框架(1)- 概述
Golang 框架实战 -KisFlow 流式计算框架(2)- 我的项目构建 / 根底模块 -(上)
Golang 框架实战 -KisFlow 流式计算框架(3)- 我的项目构建 / 根底模块 -(下)
Golang 框架实战 -KisFlow 流式计算框架(4)- 数据流
Golang 框架实战 -KisFlow 流式计算框架(5)-Function 调度

正文完
 0