乐趣区

关于go:Golang框架实战KisFlow流式计算框架3项目构建基础模块下

连载中 …
Golang 框架实战 -KisFlow 流式计算框架(1)- 概述
Golang 框架实战 -KisFlow 流式计算框架(2)- 我的项目构建 / 根底模块 -(上)
Golang 框架实战 -KisFlow 流式计算框架(3)- 我的项目构建 / 根底模块 -(下)


首先咱们要先定义 KisFlow 的外围构造体,KisFlow构造体,通过上述的设计理念,咱们得悉,KisFlow 示意整个一条数据计算流的构造。其中每次数据在一个 Flow 上,顺次执行挂载在以后 Flow 的 Function。

2.3.1 KisFunction 家族

KisFunction 应该是一个链式调用,所以构造体的根本状态应该是一个链表,通过一次 Function 的调用完结后,默认能够调度到下一个 Function 的节点上。在 KisFlow 中,一共有 saveload, calculate, extend, varify等多种行为的 Funciton,所以这里咱们采纳上述五种 function 的模板类,不便今后在不同针对不同特色的 function 做更加灵便和性能隔离的拓展和革新。

整体的 KisFunction 的类图设计如下:

2.2.2 形象层 KisFunction 定义

kis-flow 中创立一个新的目录 function 用来寄存 function 的代码。
首先形象接口编写在 kis/ 目录下。

kis-flow/kis/function.go

package kis

import (
    "context"
    "kis-flow/config"
)

// Function 流式计算根底计算模块,KisFunction 是一条流式计算的根本计算逻辑单元,//                任意个 KisFunction 能够组合成一个 KisFlow
type Function interface {
    // Call 执行流式计算逻辑
    Call(ctx context.Context, flow Flow) error

    // SetConfig 给以后 Function 实例配置策略
    SetConfig(s *config.KisFuncConfig) error
    // GetConfig 获取以后 Function 实例配置策略
    GetConfig() *config.KisFuncConfig

    // SetFlow 给以后 Function 实例设置所依赖的 Flow 实例
    SetFlow(f Flow) error
    // GetFlow 获取以后 Functioin 实力所依赖的 Flow
    GetFlow() Flow

    // CreateId 给以后 Funciton 实力生成一个随机的实例 KisID
    CreateId()
    // GetId 获取以后 Function 的 FID
    GetId() string
    // GetPrevId 获取以后 Function 上一个 Function 节点 FID
    GetPrevId() string
    // GetNextId 获取以后 Function 下一个 Function 节点 FID
    GetNextId() string

    // Next 返回下一层计算流 Function,如果以后层为最初一层,则返回 nil
    Next() Function
    // Prev 返回上一层计算流 Function,如果以后层为最初一层,则返回 nil
    Prev() Function
    // SetN 设置下一层 Function 实例
    SetN(f Function)
    // SetP 设置上一层 Function 实例
    SetP(f Function)
}

2.2.3 KisId 随机惟一实例 ID

上述提出了一个新的概念 KisId。KisID 为Function 的实例 ID,用于 KisFlow 外部辨别不同的实例对象。KisId 和 Function Config 中的 Fid 的区别在于,Fid 用来形容一类 Funcion 策略的 ID,而 KisId 则为在 KisFlow 中 KisFunction 曾经实例化的 实例对象 ID 这个 ID 是随机生成且惟一。

创立 kis-flow/id/ 目录,且创立 kis_id.go 文件,实现无关 kis_id 生成的算法。

kis-flow/id/kis_id.go

package id

import (
    "github.com/google/uuid"
    "kis-flow/common"
    "strings"
)

// KisID 获取一个中随机实例 ID
// 格局为  "prefix1-[prefix2-][prefix3-]ID"
// 如:flow-1234567890
// 如:func-1234567890
// 如: conn-1234567890
// 如: func-1-1234567890
func KisID(prefix ...string) (kisId string) {idStr := strings.Replace(uuid.New().String(), "-", "", -1)
    kisId = formatKisID(idStr, prefix...)

    return
}

func formatKisID(idStr string, prefix ...string) string {
    var kisId string

    for _, fix := range prefix {
        kisId += fix
        kisId += common.KisIdJoinChar
    }

    kisId += idStr

    return kisId
}

kisId模块提供 KisID() 办法,这外面依赖了第三方分布式 ID 生成库 github.com/google/uuid,生成的随机 ID 为一个字符串,且调用者能够提供多个前缀,通过- 符号进行拼接,失去的随机字符串 ID,如:func-1234567890

针对 KisId 的前缀,提供了一些字符串的枚举,如下:

kis-flow/common/const.go

// KisIdType 用户生成 KisId 的字符串前缀
const (
    KisIdTypeFlow       = "flow"
    KisIdTypeConnnector = "conn"
    KisIdTypeFunction   = "func"
    KisIdTypeGlobal     = "global"
    KisIdJoinChar       = "-"
)

2.2.4 BaseFunction 根底父类

依照设计,咱们须要提供一个 BaseFunction 作为 Function 的一个子类,实现一些根底的性能接口。留下 Call() 让具体模式的 KisFunctionX 来重写实现,上面来进行对 BaseFunction 构造的定义。
kis-flow/function/创立kis_base_funciton.go 文件。

A. 构造定义

kis-flow/function/kis_base_function.go

package function

import (
    "context"
    "errors"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/id"
    "kis-flow/kis"
)

type BaseFunction struct {
    // Id , KisFunction 的实例 ID,用于 KisFlow 外部辨别不同的实例对象
    Id     string
    Config *config.KisFuncConfig

    // flow
    Flow kis.Flow // 上下文环境 KisFlow

    // link
    N kis.Function // 下一个流计算 Function
    P kis.Function // 上一个流计算 Function
}

B. 办法实现

kis-flow/function/kis_base_function.go

// Call
// BaseFunction 为空实现,目标为了让其余具体类型的 KisFunction,如 KisFunction_V 来继承 BaseFuncion 来重写此办法
func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error {return nil}

func (base *BaseFunction) Next() kis.Function {return base.N}

func (base *BaseFunction) Prev() kis.Function {return base.P}

func (base *BaseFunction) SetN(f kis.Function) {base.N = f}

func (base *BaseFunction) SetP(f kis.Function) {base.P = f}

func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
    if s == nil {return errors.New("KisFuncConfig is nil")
    }

    base.Config = s

    return nil
}

func (base *BaseFunction) GetId() string {return base.Id}

func (base *BaseFunction) GetPrevId() string {
    if base.P == nil {
        //Function 为首结点
        return common.FunctionIdFirstVirtual
    }
    return base.P.GetId()}

func (base *BaseFunction) GetNextId() string {
    if base.N == nil {
        //Function 为尾结点
        return common.FunctionIdLastVirtual
    }
    return base.N.GetId()}

func (base *BaseFunction) GetConfig() *config.KisFuncConfig {return base.Config}

func (base *BaseFunction) SetFlow(f kis.Flow) error {
    if f == nil {return errors.New("KisFlow is nil")
    }
    base.Flow = f
    return nil
}

func (base *BaseFunction) GetFlow() kis.Flow {return base.Flow}

func (base *BaseFunction) CreateId() {base.Id = id.KisID(common.KisIdTypeFunction)
}

这里留神 GetPrevId()GetNextId()两个办法实现,因为如果以后 Functioin 为双向链表中的第一个节点或者最初一个节点,那么他们的上一个或者下一个是没有节点的,那么 ID 也就不存在,为了在应用中不呈现得不到 ID 的状况,咱们提供了两个虚构 FID,做非凡状况的边界解决,定义在 const.go 中。

kis-flow/common/const.go

const (
    // FunctionIdFirstVirtual 为首结点 Function 上一层虚构的 Function ID
    FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
    // FunctionIdLastVirtual 为尾结点 Function 下一层虚构的 Function ID
    FunctionIdLastVirtual = "FunctionIdLastVirtual"
)

2.2.5 KisFunction 的 V /S/L/C/ E 等模式类定义

上面别离实现 V /S/L/C/E 五种不同模式的 KisFunction 子类。这里别离用创立文件来实现。

A. KisFunctionV

kis-flow/function/kis_function_v.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionV struct {BaseFunction}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {fmt.Printf("KisFunctionV, flow = %+v\n", flow)

    // TODO 调用具体的 Function 执行办法

    return nil
}

B. KisFunctionS

kis-flow/function/kis_function_s.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionS struct {BaseFunction}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {fmt.Printf("KisFunctionS, flow = %+v\n", flow)

    // TODO 调用具体的 Function 执行办法

    return nil
}

C. KisFunctionL

kis-flow/function/kis_function_l.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionL struct {BaseFunction}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {fmt.Printf("KisFunctionL, flow = %+v\n", flow)

    // TODO 调用具体的 Function 执行办法

    return nil
}

D. KisFunctionC

kis-flow/function/kis_function_c.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionC struct {BaseFunction}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {fmt.Printf("KisFunction_C, flow = %+v\n", flow)

    // TODO 调用具体的 Function 执行办法

    return nil
}

E. KisFunctionE

kis-flow/function/kis_function_e.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionE struct {BaseFunction}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {fmt.Printf("KisFunctionE, flow = %+v\n", flow)

    // TODO 调用具体的 Function 执行办法

    return nil
}

2.2.6 创立 KisFunction 实例

上面提供一个创立具体模式 Function 的办法,这里采纳简略工厂办法模式来实现创建对象。

kis-flow/function/kis_base_function.go

func (base *BaseFunction) CreateId() {base.Id = id.KisID(common.KisIdTypeFunction)
}

// NewKisFunction 创立一个 NsFunction
// flow: 以后所属的 flow 实例
// s : 以后 function 的配置策略
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
    var f kis.Function

    // 工厂生产泛化对象
    switch common.KisMode(config.FMode) {
    case common.V:
        f = new(KisFunctionV)
        break
    case common.S:
        f = new(KisFunctionS)
    case common.L:
        f = new(KisFunctionL)
    case common.C:
        f = new(KisFunctionC)
    case common.E:
        f = new(KisFunctionE)
    default:
        //LOG ERROR
        return nil
    }

    // 生成随机实例惟一 ID
    f.CreateId()

    // 设置根底信息属性
    if err := f.SetConfig(config); err != nil {panic(err)
    }

    if err := f.SetFlow(flow); err != nil {panic(err)
    }

    return f
}

留神 NewKisFunction()办法返回的是一个形象的 interface Function

还要留神,目前到这里没有实现 Flow 对象,然而 KisFunciton 的创立须要依赖传递一个 Flow 对象,咱们当初能够临时简略创立一个 Flow 对象的构造方法,之后在实现 Flow 章节再欠缺这部分的代码。
kis-filw/kis/中创立 flow.go 文件。

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/config"
)

type Flow interface {// TODO}

kis-flow/flow/ 下创立 kis_flow.go 文件,实现如下:

kis-flow/flow/kis_flow.go

package flow

import "kis-flow/config"

// KisFlow 用于贯通整条流式计算的上下文环境
type KisFlow struct {
    Id   string
    Name string
    // TODO
}

// TODO for test
// NewKisFlow 创立一个 KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {flow := new(KisFlow)

    // 根底信息
    flow.Id = id.KisID(common.KisIdTypeFlow)
    flow.Name = conf.FlowName

    return flow
}

2.2.7 单元测试 KisFunction 创立实例

当初来对上述的 KisFunction 实例的创立做一个简略的单元测试,在 kis-flow/test/ 创立 kis_function_test.go 文件。

kis-flow/test/kis_function_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/flow"
    "kis-flow/function"
    "testing"
)

func TestNewKisFunction(t *testing.T) {ctx := context.Background()

    // 1. 创立一个 KisFunction 配置实例
    source := config.KisSource{
        Name: "公众号抖音商城户订单数据",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
    if myFuncConfig1 == nil {panic("myFuncConfig1 is nil")
    }

    // 2. 创立一个 KisFlow 配置实例
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. 创立一个 KisFlow 对象
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. 创立一个 KisFunction 对象
    func1 := function.NewKisFunction(flow1, myFuncConfig1)

    if err := func1.Call(ctx, flow1); err != nil {t.Errorf("func1.Call() error = %v", err)
    }
}

流程很简略,分为四个小步骤:

  1. 创立一个 KisFunction 配置实例
  2. 创立一个 KisFlow 配置实例
  3. 创立一个 KisFlow 对象
  4. 创立一个 KisFunction 对象

cd 到 kis-flow/test/ 目录下执行:

go test -test.v -test.paniconexit0 -test.run TestNewKisFunction

后果如下:

=== RUN   TestNewKisFunction
KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

--- PASS: TestNewKisFunction (0.00s)
PASS
ok      kis-flow/test   1.005s

咱们曾经调用到了具体的 KisFunciton_C 实例的 Call() 办法。


作者: 刘丹冰 Aceld github: https://github.com/aceld

KisFlow 开源我的项目地址:https://github.com/aceld/kis-flow


连载中 …
Golang 框架实战 -KisFlow 流式计算框架(1)- 概述
Golang 框架实战 -KisFlow 流式计算框架(2)- 我的项目构建 / 根底模块 -(上)
Golang 框架实战 -KisFlow 流式计算框架(3)- 我的项目构建 / 根底模块 -(下)

退出移动版