连载中...
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-我的项目构建/根底模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-我的项目构建/根底模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度

4.1 Router

当初,将KisFlow提供对外Function开放注册能力,首先咱们要定义一些注册函数原型,和治理这些Function的Router映射关系类型。

创立kis-flow/kis/router.go,定义原型如下:

kis-flow/kis/router.go
package kisimport "context"// FaaS Function as a Servicetype FaaS func(context.Context, Flow) error// funcRouter// key: Function Name// value: Function 回调自定义业务type funcRouter map[string]FaaS// flowRouter// key: Flow Name// value: Flowtype flowRouter map[string]Flow

FaaS:是开发者给KisFlow注册的Function回调业务函数原型,须要传递两个参数,Context和Flow,Context次要承载业务的上线文环境,Flow次要承载KisFlow的上下文环境,咱们能够通过Flow获取以后Function的配置信息,以后Function的数据信息,曾经Flow上其余节点的Function相干信息等。

funcRouter: 治理FunctionName和FaaS业务回调的映射关系Map,是一个公有类型,不对外提供援用。须要留神的是,funcRouter的key是FunctionName,因为FunctionId是生成的随机Id,开发者在注册路由的时候,并无奈预判和可读,所以关联的业务回调是与FunctionName做的映射关系。

flowRouter:治理FlowName和Flow实例的映射关系Map,是一个公有类型,不对外提供援用。flowRouter仍然是FlowName的映射关系。

4.2 KisPool

KisFlow提供一个用来治理全副全局映射关系的类KisPool,KisPool蕴含Router,且提供对Router的治理能力。

4.2.1 KisPool的定义

创立 kis-flow/kis/pool.go 文件,来创立kis_pool模块。

kis-flow/kis/pool.go
package kisimport (    "context"    "errors"    "fmt"    "kis-flow/log"    "sync")var _poolOnce sync.Once//  kisPool 用于治理全副的Function和Flow配置的池子type kisPool struct {    fnRouter funcRouter   // 全副的Function治理路由    fnLock   sync.RWMutex // fnRouter 锁    flowRouter flowRouter   // 全副的flow对象    flowLock   sync.RWMutex // flowRouter 锁}// 单例var _pool *kisPool// Pool 单例结构func Pool() *kisPool {    _poolOnce.Do(func() {        //创立kisPool对象        _pool = new(kisPool)        // fnRouter初始化        _pool.fnRouter = make(funcRouter)        // flowRouter初始化        _pool.flowRouter = make(flowRouter)    })    return _pool}

kis_pool采纳单例模式,Pool()办法为获取以后的单例,无关fnRouterflowRouter在生命周期只会初始化一次,通过sync.Once来管制。

4.2.2 注册及获取Flow

KisPool能够提供增加和获取Flow信息的接口,如下:

kis-flow/kis/pool.go
func (pool *kisPool) AddFlow(name string, flow Flow) {    pool.flowLock.Lock()    defer pool.flowLock.Unlock()    if _, ok := pool.flowRouter[name]; !ok {        pool.flowRouter[name] = flow    } else {        errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name)        panic(errString)    }    log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name)}func (pool *kisPool) GetFlow(name string) Flow {    pool.flowLock.RLock()    defer pool.flowLock.RUnlock()    if flow, ok := pool.flowRouter[name]; ok {        return flow    } else {        return nil    }}

AddFlow会依据雷同的FlowName进行做反复校验,雷同的Flow无奈注册屡次。

4.2.3 注册及调度Function

KisPool提供注册Funciton回调和调度Funciton办法, 如下。

kis-flow/kis/pool.go
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册func (pool *kisPool) FaaS(fnName string, f FaaS) {    pool.fnLock.Lock()    defer pool.fnLock.Unlock()    if _, ok := pool.fnRouter[fnName]; !ok {        pool.fnRouter[fnName] = f    } else {        errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)        panic(errString)    }    log.Logger().InfoF("Add KisPool FuncName=%s", fnName)}// CallFunction 调度 Functionfunc (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {    if f, ok := pool.fnRouter[fnName]; ok {        return f(ctx, flow)    }    log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)    return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")}

CallFunction()中,须要传递参数Flow,作为数据流调度的上下文环境。 开发者在自定义FaaS中能够通过Flow来获取一些Funciton信息,所以这里须要给Flow补充几个获取配置信息的接口,之后如果再须要,再持续补充,具体如下:

kis-flow/kis/flow.go
type Flow interface {    // Run 调度Flow,顺次调度Flow中的Function并且执行    Run(ctx context.Context) error    // Link 将Flow中的Function依照配置文件中的配置进行连贯    Link(fConf *config.KisFuncConfig, fParams config.FParam) error    // CommitRow 提交Flow数据到行将执行的Function层    CommitRow(row interface{}) error    // Input 失去flow以后执行Function的输出源数据    Input() common.KisRowArr    // ++++++++++++++++++++++++++++++++++    // GetName 失去Flow的名称    GetName() string    // GetThisFunction 失去以后正在执行的Function    GetThisFunction() Function    // GetThisFuncConf 失去以后正在执行的Function的配置    GetThisFuncConf() *config.KisFuncConfig}
kis-flow/flow/kis_flow.go
func (flow *KisFlow) GetName() string {    return flow.Name}func (flow *KisFlow) GetThisFunction() kis.Function {    return flow.ThisFunction}func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {    return flow.ThisFunction.GetConfig()}

4.3 KisFunction援用KisPool调度

当初,咱们就能够在KisFunctionX中的Call()来通过Pool进行调度了,顺次批改每个Function的Call()办法。

kis-flow/function/kis_function_c.go
package functionimport (    "context"    "kis-flow/kis"    "kis-flow/log")type KisFunctionC struct {    BaseFunction}func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {    log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)    // 通过KisPool 路由到具体的执行计算Function中    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)        return err    }    return nil}
kis-flow/function/kis_function_e.go
package functionimport (    "context"    "kis-flow/kis"    "kis-flow/log")type KisFunctionE struct {    BaseFunction}func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {    log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)    // 通过KisPool 路由到具体的执行计算Function中    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)        return err    }    return nil}
kis-flow/function/kis_function_l.go
package functionimport (    "context"    "kis-flow/kis"    "kis-flow/log")type KisFunctionL struct {    BaseFunction}func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {    log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow)    // 通过KisPool 路由到具体的执行计算Function中    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)        return err    }    return nil}
kis-flow/function/kis_function_s.go
package functionimport (    "context"    "kis-flow/kis"    "kis-flow/log")type KisFunctionS struct {    BaseFunction}func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {    log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow)    // 通过KisPool 路由到具体的执行计算Function中    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)        return err    }    return nil}
kis-flow/function/kis_function_v.go
package functionimport (    "context"    "kis-flow/kis"    "kis-flow/log")type KisFunctionV struct {    BaseFunction}func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {    log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow)    // 通过KisPool 路由到具体的执行计算Function中    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)        return err    }    return nil}

4.4 KisPool单元测试

接下来咱们来针对KisPool进行单元测试。

4.4.1 自定义FaaS

kis-flow/test/kis_pool_test.go
package testimport (    "context"    "fmt"    "kis-flow/common"    "kis-flow/config"    "kis-flow/flow"    "kis-flow/kis"    "testing")func funcName1Handler(ctx context.Context, flow kis.Flow) error {    fmt.Println("---> Call funcName1Handler ----")    for index, row := range flow.Input() {        // 打印数据        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)        fmt.Println(str)        // 计算结果数据        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)        // 提交后果数据        _ = flow.CommitRow(resultStr)    }    return nil}func funcName2Handler(ctx context.Context, flow kis.Flow) error {    for _, row := range flow.Input() {        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)        fmt.Println(str)    }    return nil}

4.4.2 注册FaaS及启动Flow

kis-flow/test/kis_pool_test.go
func TestNewKisPool(t *testing.T) {    ctx := context.Background()    // 0. 注册Function    kis.Pool().FaaS("funcName1", funcName1Handler)    kis.Pool().FaaS("funcName2", funcName2Handler)    // 1. 创立2个KisFunction配置实例    source1 := config.KisSource{        Name: "公众号抖音商城户订单数据",        Must: []string{"order_id", "user_id"},    }    source2 := config.KisSource{        Name: "用户订单错误率",        Must: []string{"order_id", "user_id"},    }    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)    if myFuncConfig1 == nil {        panic("myFuncConfig1 is nil")    }    myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)    if myFuncConfig2 == nil {        panic("myFuncConfig2 is nil")    }    // 2. 创立一个 KisFlow 配置实例    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)    // 3. 创立一个KisFlow对象    flow1 := flow.NewKisFlow(myFlowConfig1)    // 4. 拼接Functioin 到 Flow 上    if err := flow1.Link(myFuncConfig1, nil); err != nil {        panic(err)    }    if err := flow1.Link(myFuncConfig2, nil); err != nil {        panic(err)    }    // 5. 提交原始数据    _ = flow1.CommitRow("This is Data1 from Test")    _ = flow1.CommitRow("This is Data2 from Test")    _ = flow1.CommitRow("This is Data3 from Test")    // 6. 执行flow1    if err := flow1.Run(ctx); err != nil {        panic(err)    }}

cd到kis-flow/test/下执行命令:

go test -test.v -test.paniconexit0 -test.run TestNewKisPool

后果如下:

=== RUN   TestNewKisPoolAdd KisPool FuncName=funcName1Add KisPool FuncName=funcName2context.Background====> After CommitSrcData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]KisFunctionC, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c0190 ThisFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}---> Call funcName1Handler ----In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data1 from TestIn FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data2 from TestIn FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data3 from Testcontext.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]KisFunctionE, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c01e0 ThisFunctionId:func-9cd2ab870b384794b312d2be10bb06fa PrevFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 0In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 1In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 2--- PASS: TestNewKisPool (0.00s)PASSok      kis-flow/test   0.520s

通过日志的具体校验,后果是合乎咱们预期的。

好了,当初Function的业务能力曾经凋谢给开发者了,接下来咱们来持续欠缺KisFlow的能力。

4.5 【V0.3】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.3


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源我的项目地址:https://github.com/aceld/kis-flow


Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-我的项目构建/根底模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-我的项目构建/根底模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度