连载中...
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-我的项目构建/根底模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-我的项目构建/根底模块-(下)


首先咱们要先定义KisFlow的外围构造体,KisFlow构造体,通过上述的设计理念,咱们得悉,KisFlow示意整个一条数据计算流的构造。其中每次数据在一个Flow上,顺次执行挂载在以后Flow的Function。

2.3.1 KisFunction家族

KisFunction应该是一个链式调用,所以构造体的根本状态应该是一个链表,通过一次Function的调用完结后,默认能够调度到下一个Function的节点上。 在KisFlow中,一共有 saveload, calculate, extend, varify等多种行为的Funciton,所以这里咱们采纳上述五种function的模板类,不便今后在不同针对不同特色的function做更加灵便和性能隔离的拓展和革新。

整体的KisFunction的类图设计如下:

2.2.2 形象层KisFunction定义

kis-flow中创立一个新的目录function用来寄存function的代码。
首先形象接口编写在kis/目录下。

kis-flow/kis/function.go
package kisimport (    "context"    "kis-flow/config")// Function 流式计算根底计算模块,KisFunction是一条流式计算的根本计算逻辑单元,//                任意个KisFunction能够组合成一个KisFlowtype Function interface {    // Call 执行流式计算逻辑    Call(ctx context.Context, flow Flow) error    // SetConfig 给以后Function实例配置策略    SetConfig(s *config.KisFuncConfig) error    // GetConfig 获取以后Function实例配置策略    GetConfig() *config.KisFuncConfig    // SetFlow 给以后Function实例设置所依赖的Flow实例    SetFlow(f Flow) error    // GetFlow 获取以后Functioin实力所依赖的Flow    GetFlow() Flow    // CreateId 给以后Funciton实力生成一个随机的实例KisID    CreateId()    // GetId 获取以后Function的FID    GetId() string    // GetPrevId 获取以后Function上一个Function节点FID    GetPrevId() string    // GetNextId 获取以后Function下一个Function节点FID    GetNextId() string    // Next 返回下一层计算流Function,如果以后层为最初一层,则返回nil    Next() Function    // Prev 返回上一层计算流Function,如果以后层为最初一层,则返回nil    Prev() Function    // SetN 设置下一层Function实例    SetN(f Function)    // SetP 设置上一层Function实例    SetP(f Function)}

2.2.3 KisId随机惟一实例ID

上述提出了一个新的概念KisId。 KisID为Function的实例ID,用于KisFlow外部辨别不同的实例对象。KisId 和 Function Config中的 Fid的区别在于,Fid用来形容一类Funcion策略的ID,而KisId则为在KisFlow中KisFunction曾经实例化的 实例对象ID 这个ID是随机生成且惟一。

创立kis-flow/id/目录,且创立kis_id.go 文件,实现无关kis_id生成的算法。

kis-flow/id/kis_id.go
package idimport (    "github.com/google/uuid"    "kis-flow/common"    "strings")// KisID 获取一个中随机实例ID// 格局为  "prefix1-[prefix2-][prefix3-]ID"// 如:flow-1234567890// 如:func-1234567890// 如: conn-1234567890// 如: func-1-1234567890func KisID(prefix ...string) (kisId string) {    idStr := strings.Replace(uuid.New().String(), "-", "", -1)    kisId = formatKisID(idStr, prefix...)    return}func formatKisID(idStr string, prefix ...string) string {    var kisId string    for _, fix := range prefix {        kisId += fix        kisId += common.KisIdJoinChar    }    kisId += idStr    return kisId}

kisId模块提供KisID()办法,这外面依赖了第三方分布式ID生成库github.com/google/uuid,生成的随机ID为一个字符串,且调用者能够提供多个前缀,通过-符号进行拼接,失去的随机字符串ID,如:func-1234567890

针对KisId的前缀,提供了一些字符串的枚举,如下:

kis-flow/common/const.go
// KisIdType 用户生成KisId的字符串前缀const (    KisIdTypeFlow       = "flow"    KisIdTypeConnnector = "conn"    KisIdTypeFunction   = "func"    KisIdTypeGlobal     = "global"    KisIdJoinChar       = "-")

2.2.4 BaseFunction根底父类

依照设计,咱们须要提供一个BaseFunction作为Function的一个子类,实现一些根底的性能接口。留下Call()让具体模式的KisFunctionX来重写实现,上面来进行对BaseFunction构造的定义。
kis-flow/function/创立kis_base_funciton.go 文件。

A. 构造定义

kis-flow/function/kis_base_function.go
package functionimport (    "context"    "errors"    "kis-flow/common"    "kis-flow/config"    "kis-flow/id"    "kis-flow/kis")type BaseFunction struct {    // Id , KisFunction的实例ID,用于KisFlow外部辨别不同的实例对象    Id     string    Config *config.KisFuncConfig    // flow    Flow kis.Flow //上下文环境KisFlow    // link    N kis.Function //下一个流计算Function    P kis.Function //上一个流计算Function}

B. 办法实现

kis-flow/function/kis_base_function.go
// Call// BaseFunction 为空实现,目标为了让其余具体类型的KisFunction,如KisFunction_V 来继承BaseFuncion来重写此办法func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }func (base *BaseFunction) Next() kis.Function {    return base.N}func (base *BaseFunction) Prev() kis.Function {    return base.P}func (base *BaseFunction) SetN(f kis.Function) {    base.N = f}func (base *BaseFunction) SetP(f kis.Function) {    base.P = f}func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {    if s == nil {        return errors.New("KisFuncConfig is nil")    }    base.Config = s    return nil}func (base *BaseFunction) GetId() string {    return base.Id}func (base *BaseFunction) GetPrevId() string {    if base.P == nil {        //Function为首结点        return common.FunctionIdFirstVirtual    }    return base.P.GetId()}func (base *BaseFunction) GetNextId() string {    if base.N == nil {        //Function为尾结点        return common.FunctionIdLastVirtual    }    return base.N.GetId()}func (base *BaseFunction) GetConfig() *config.KisFuncConfig {    return base.Config}func (base *BaseFunction) SetFlow(f kis.Flow) error {    if f == nil {        return errors.New("KisFlow is nil")    }    base.Flow = f    return nil}func (base *BaseFunction) GetFlow() kis.Flow {    return base.Flow}func (base *BaseFunction) CreateId() {    base.Id = id.KisID(common.KisIdTypeFunction)}

这里留神 GetPrevId()GetNextId()两个办法实现,因为如果以后Functioin为双向链表中的第一个节点或者最初一个节点,那么他们的上一个或者下一个是没有节点的,那么ID也就不存在,为了在应用中不呈现得不到ID的状况,咱们提供了两个虚构FID,做非凡状况的边界解决,定义在const.go中。

kis-flow/common/const.go
const (    // FunctionIdFirstVirtual 为首结点Function上一层虚构的Function ID    FunctionIdFirstVirtual = "FunctionIdFirstVirtual"    // FunctionIdLastVirtual 为尾结点Function下一层虚构的Function ID    FunctionIdLastVirtual = "FunctionIdLastVirtual")

2.2.5 KisFunction的V/S/L/C/E等模式类定义

上面别离实现V/S/L/C/E 五种不同模式的KisFunction子类。这里别离用创立文件来实现。

A. KisFunctionV

kis-flow/function/kis_function_v.go
package functionimport (    "context"    "fmt"    "kis-flow/kis")type KisFunctionV struct {    BaseFunction}func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {    fmt.Printf("KisFunctionV, flow = %+v\n", flow)    // TODO 调用具体的Function执行办法    return nil}

B. KisFunctionS

kis-flow/function/kis_function_s.go
package functionimport (    "context"    "fmt"    "kis-flow/kis")type KisFunctionS struct {    BaseFunction}func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {    fmt.Printf("KisFunctionS, flow = %+v\n", flow)    // TODO 调用具体的Function执行办法    return nil}

C. KisFunctionL

kis-flow/function/kis_function_l.go
package functionimport (    "context"    "fmt"    "kis-flow/kis")type KisFunctionL struct {    BaseFunction}func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {    fmt.Printf("KisFunctionL, flow = %+v\n", flow)    // TODO 调用具体的Function执行办法    return nil}

D. KisFunctionC

kis-flow/function/kis_function_c.go
package functionimport (    "context"    "fmt"    "kis-flow/kis")type KisFunctionC struct {    BaseFunction}func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {    fmt.Printf("KisFunction_C, flow = %+v\n", flow)    // TODO 调用具体的Function执行办法    return nil}

E. KisFunctionE

kis-flow/function/kis_function_e.go
package functionimport (    "context"    "fmt"    "kis-flow/kis")type KisFunctionE struct {    BaseFunction}func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {    fmt.Printf("KisFunctionE, flow = %+v\n", flow)    // TODO 调用具体的Function执行办法    return nil}

2.2.6 创立KisFunction实例

上面提供一个创立具体模式Function的办法,这里采纳简略工厂办法模式来实现创建对象。

kis-flow/function/kis_base_function.go
func (base *BaseFunction) CreateId() {    base.Id = id.KisID(common.KisIdTypeFunction)}// NewKisFunction 创立一个NsFunction// flow: 以后所属的flow实例// s : 以后function的配置策略func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {    var f kis.Function    //工厂生产泛化对象    switch common.KisMode(config.FMode) {    case common.V:        f = new(KisFunctionV)        break    case common.S:        f = new(KisFunctionS)    case common.L:        f = new(KisFunctionL)    case common.C:        f = new(KisFunctionC)    case common.E:        f = new(KisFunctionE)    default:        //LOG ERROR        return nil    }    // 生成随机实例惟一ID    f.CreateId()    //设置根底信息属性    if err := f.SetConfig(config); err != nil {        panic(err)    }    if err := f.SetFlow(flow); err != nil {        panic(err)    }    return f}

留神 NewKisFunction()办法返回的是一个形象的interface Function

还要留神,目前到这里没有实现Flow对象,然而KisFunciton的创立须要依赖传递一个Flow对象,咱们当初能够临时简略创立一个Flow对象的构造方法,之后在实现Flow章节再欠缺这部分的代码。
kis-filw/kis/中创立flow.go文件。

kis-flow/kis/flow.go
package kisimport (    "context"    "kis-flow/config")type Flow interface {   // TODO}

kis-flow/flow/下创立kis_flow.go文件,实现如下:

kis-flow/flow/kis_flow.go
package flowimport "kis-flow/config"// KisFlow 用于贯通整条流式计算的上下文环境type KisFlow struct {    Id   string    Name string    // TODO}// TODO for test// NewKisFlow 创立一个KisFlow.func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {    flow := new(KisFlow)    // 根底信息    flow.Id = id.KisID(common.KisIdTypeFlow)    flow.Name = conf.FlowName    return flow}

2.2.7 单元测试KisFunction创立实例

当初来对上述的KisFunction实例的创立做一个简略的单元测试,在kis-flow/test/创立kis_function_test.go文件。

kis-flow/test/kis_function_test.go
package testimport (    "context"    "kis-flow/common"    "kis-flow/config"    "kis-flow/flow"    "kis-flow/function"    "testing")func TestNewKisFunction(t *testing.T) {    ctx := context.Background()    // 1. 创立一个KisFunction配置实例    source := config.KisSource{        Name: "公众号抖音商城户订单数据",        Must: []string{"order_id", "user_id"},    }    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)    if myFuncConfig1 == nil {        panic("myFuncConfig1 is nil")    }    // 2. 创立一个 KisFlow 配置实例    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)    // 3. 创立一个KisFlow对象    flow1 := flow.NewKisFlow(myFlowConfig1)    // 4. 创立一个KisFunction对象    func1 := function.NewKisFunction(flow1, myFuncConfig1)    if err := func1.Call(ctx, flow1); err != nil {        t.Errorf("func1.Call() error = %v", err)    }}

流程很简略,分为四个小步骤:

  1. 创立一个KisFunction配置实例
  2. 创立一个 KisFlow 配置实例
  3. 创立一个KisFlow对象
  4. 创立一个KisFunction对象

cd到kis-flow/test/目录下执行:

go test -test.v -test.paniconexit0 -test.run TestNewKisFunction

后果如下:

=== RUN   TestNewKisFunctionKisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}--- PASS: TestNewKisFunction (0.00s)PASSok      kis-flow/test   1.005s

咱们曾经调用到了具体的KisFunciton_C实例的Call()办法。


作者:刘丹冰Aceld github: https://github.com/aceld

KisFlow开源我的项目地址:https://github.com/aceld/kis-flow


连载中...
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-我的项目构建/根底模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-我的项目构建/根底模块-(下)