2. V0.1-我的项目构建及根底模块定义

首先咱们创立咱们的我的项目,我的项目的主文件目录就叫KisFlow,且在Github上创立对应的仓库: https://github.com/aceld/kis-flow 而后将我的项目代码clone到本地。

2.0 我的项目构建

(这里如果你是依照本教程开发,须要在本人的仓库从新创立一个新我的项目,并且clone到本地开发)

2.0.1 创立我的项目目录

接下来,咱们先将我的项目中的必要的文件目录创立好,我的项目的目录构造如下:

 kis-flow /.├── LICENSE├── README.md├── common/├── example/├── function/├── conn/├── config/├── flow/└── kis/

这里咱们创立三个文件夹, common/为 寄存咱们一些专用的根底常量和一些枚举参数,还有一些工具类的办法。 flow/为寄存KisFlow的外围代码。 function/为寄存KisFunction的外围代码。 conn/为寄存KisConnector的外围代码。 config/ 寄存flow、functioin、connector等策略配置信息模块。 example/为咱们针对KisFlow的一些测试案例和test单元测试案例等,可能及时验证咱们的我的项目成果。 kis/来寄存所有模块的形象层。

2.0.1 创立go.mod

cd 到 kis-flow的我的项目根目录,执行如下指令:

咱们会失去go.mod文件,这个是作为以后我的项目的包管理文件,如下:

module kis-flowgo 1.18

首先因为在之后会有很多调试日志要打印,咱们先把日志模块集成了,日志模块KisFlow提供一个默认的规范输入Logger对象,再对我凋谢一个SetLogger() 办法来进行从新设置开发者本人的Logger模块。

2.1 KisLogger

2.1.1 Logger形象接口

将Logger的定义在kis-flow/log/目录下,创立kis_log.go文件:

kis-flow/log/kis_log.go
package logimport "context"type KisLogger interface {    // InfoFX 有上下文的Info级别日志接口, format字符串格局    InfoFX(ctx context.Context, str string, v ...interface{})    // ErrorFX 有上下文的Error级别日志接口, format字符串格局    ErrorFX(ctx context.Context, str string, v ...interface{})    // DebugFX 有上下文的Debug级别日志接口, format字符串格局    DebugFX(ctx context.Context, str string, v ...interface{})    // InfoF 无上下文的Info级别日志接口, format字符串格局    InfoF(str string, v ...interface{})    // ErrorF 无上下文的Error级别日志接口, format字符串格局    ErrorF(str string, v ...interface{})    // DebugF 无上下文的Debug级别日志接口, format字符串格局    DebugF(str string, v ...interface{})}// kisLog 默认的KisLog 对象var kisLog KisLogger// SetLogger 设置KisLog对象, 能够是用户自定义的Logger对象func SetLogger(newlog KisLogger) {    kisLog = newlog}// Logger 获取到kisLog对象func Logger() KisLogger {    return kisLog}

KisLogger提供了三个级别的日志,别离是Info、Error、Debug。且也别离提供了具备context参数与不具备context参数的两套日志接口。
提供一个全局对象kisLog,默认的KisLog 对象。以及办法SetLogger()Logger()供开发能够设置本人的Logger对象以及获取到Logger对象。

2.1.2 默认的日志对象KisDefaultLogger

如果开发没有自定义的日志对象定义,那么KisFlow会提供一个默认的日志对象kisDefaultLogger,这个类实现了KisLogger的全副接口,且都是默认打印到规范输入的模式来打印日志,定义在kis-flow/log/目录下,创立kis_default_log.go文件。

kis-flow/log/kis_default_log.go
package logimport (    "context"    "fmt")// kisDefaultLog 默认提供的日志对象type kisDefaultLog struct{}func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {    fmt.Printf(str, v...)}func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {    fmt.Printf(str, v...)}func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {    fmt.Printf(str, v...)}func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {    fmt.Println(ctx)    fmt.Printf(str, v...)}func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {    fmt.Println(ctx)    fmt.Printf(str, v...)}func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {    fmt.Println(ctx)    fmt.Printf(str, v...)}func init() {    // 如果没有设置Logger, 则启动时应用默认的kisDefaultLog对象    if Logger() == nil {        SetLogger(&kisDefaultLog{})    }}

这里在init()初始化办法中,会判断目前是否曾经有设置全局的Logger对象,如果没有,KisFlow会默认抉择kisDefaultLog 作为全局Logger日志对象。

2.1.3 单元测试KisLogger

当初,咱们先不针对KisLogger做过多的办法开发,咱们优先将现有的程序跑起来,做一个单元测试来测试创立一个KisLogger

kis-flow/test/kis_log_test.go
package testimport (    "context"    "kis-flow/log"    "testing")func TestKisLogger(t *testing.T) {    ctx := context.Background()    log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")    log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")    log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")    log.Logger().InfoF("TestKisLogger InfoF")    log.Logger().ErrorF("TestKisLogger ErrorF")    log.Logger().DebugF("TestKisLogger DebugF")}

咱们cdkis-flow/test/目录下执行单元测试指令:

go test -test.v -test.paniconexit0 -test.run TestKisLogger

失去后果如下:

=== RUN   TestKisLoggercontext.BackgroundTestKisLogger InfoFXcontext.BackgroundTestKisLogger ErrorFXcontext.BackgroundTestKisLogger DebugFXTestKisLogger InfoFTestKisLogger ErrorFTestKisLogger DebugF--- PASS: TestKisLogger (0.00s)PASSok      kis-flow/test   0.509s

2.2 KisConfig

在KisFlow中,咱们定义了三种外围模块,别离是KisFunction, KisFlow, KisConnector ,所以KisConfig也别离须要针对这三个模块进行定义,咱们将全副无关KisConfig的代码都放在kis-flow/config/目录下。

➜  kis-flow git:(master) ✗ tree.├── LICENSE├── README.md├── common/│   └── ├── example/│   └── ├── config/│   ├──├── test/└── go.mod

2.2.1 KisFuncConfig 定义

KisFuncConfig在设计文档中的yaml文件模式如下:

kistype: funcfname: 测试KisFunction_S1fmode: Savesource: name: 被校验的测试数据源1-用户订单维度 must: - userid - orderidoption: cname: 测试KisConnector_1 retry_times: 3 retry_duration: 500 default_params: default1: default1_param default2: default2_param

参数阐明:

接下来咱们根据上述的配置协定,来定义KisFunction的策略配置构造体,并且提供一些响应的初始化办法。 咱们在我的项目文档中创立kis_func_config.go文件,在这里咱们将须要的Config定义实现。

A. 构造体定义

kis-flow/config/kis_func_config.go
package configimport (    "kis-flow/common"    "kis-flow/log")// FParam 在以后Flow中Function定制固定配置参数类型type FParam map[string]string// KisSource 示意以后Function的业务源type KisSource struct {    Name string   `yaml:"name"` //本层Function的数据源形容    Must []string `yaml:"must"` //source必传字段}// KisFuncOption 可选配置type KisFuncOption struct {    CName        string `yaml:"cname"`           //连接器Connector名称    RetryTimes   int    `yaml:"retry_times"`     //选填,Function调度重试(不包含失常调度)最大次数    RetryDuriton int    `yaml:"return_duration"` //选填,Function调度每次重试最大工夫距离(单位:ms)    Params       FParam `yaml:"default_params"`  //选填,在以后Flow中Function定制固定配置参数}// KisFuncConfig 一个KisFunction策略配置type KisFuncConfig struct {    KisType string        `yaml:"kistype"`    FName   string        `yaml:"fname"`    FMode   string        `yaml:"fmode"`    Source  KisSource     `yaml:"source"`    Option  KisFuncOption `yaml:"option"`}

这里KisFuncConfig是相干构造体,其中 FParamKisSourceKisFuncOption均为一些相干的参数类型。

B. 相干办法定义

上面咱们先简略的提供创立KisFuncConfig的构造方法。

kis-flow/config/kis_func_config.go
// NewFuncConfig 创立一个Function策略配置对象, 用于形容一个KisFunction信息func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {     config := new(KisFuncConfig)     config.FName = funcName     if source == nil {         log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)         return nil     }     config.Source = *source     config.FMode = string(mode)     //FunctionS 和 L 须要必传KisConnector参数,起因是S和L须要通过Connector进行建设流式关系     if mode == common.S || mode == common.L {             if option == nil {                   log.Logger().ErrorF("Funcion S/L need option->Cid\n")                   return nil             } else if option.CName == "" {                   log.Logger().ErrorF("Funcion S/L need option->Cid\n")                   return nil             }       }      if option != nil {           config.Option = *option      }     return config}

上述代码中提到了common.Scommon.L两个枚举类型,这是咱们针对KisFunction提供的五种类型的枚举值,咱们能够将他们定义在 kis-flow/common/const.go文件中。

kis-flow/common/const.go
package commontype KisMode stringconst (    // V 为校验特色的KisFunction,     // 次要进行数据的过滤,验证,字段梳理,幂等等前置数据处理    V KisMode = "Verify"    // S 为存储特色的KisFunction,     // S会通过NsConnector进行将数据进行存储,数据的长期申明周期为NsWindow    S KisMode = "Save"    // L 为加载特色的KisFunction,    // L会通过KisConnector进行数据加载,通过该Function能够从逻辑上与对应的S Function进行并流    L KisMode = "Load"    // C 为计算特色的KisFunction,     // C会通过KisFlow中的数据计算,生成新的字段,将数据流传递给上游S进行存储,或者本人也已间接通过KisConnector进行存储    C KisMode = "Calculate"    // E 为扩大特色的KisFunction,    // 作为流式计算的自定义特色Function,如,Notify 调度器触发工作的音讯发送,删除一些数据,重置状态等。    E KisMode = "Expand")

如果fmodeSave或者Load阐明这个function有查问库或者存储数据的行为,那么这个Function就须要关联一个KisConnector,那么CName就须要传递进来。

C. 创立KisFuncConfig单元测试

当初,咱们先不针对KisFuncConfig做过多的办法开发,咱们优先将现有的程序跑起来,做一个单元测试来测试创立一个KisFuncConfig

kis-flow/test/kis_config_test.go
func TestNewFuncConfig(t *testing.T) {    source := config.KisSource{        Name: "公众号抖音商城户订单数据",        Must: []string{"order_id", "user_id"},    }    option := config.KisFuncOption{        CName:        "connectorName1",        RetryTimes:   3,        RetryDuriton: 300,        Params: config.FParam{            "param1": "value1",            "param2": "value2",        },    }    myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)    log.Logger().InfoF("funcName1: %+v\n", myFunc1)}

咱们cdkis-flow/test/目录下执行单元测试指令:

go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig

失去后果如下:

=== RUN   TestNewFuncConfigfuncName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:公众号抖音商城户订单数据 Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}--- PASS: TestNewFuncConfig (0.00s)PASSok      kis-flow/test   0.545s

好了,当初最简略的KisFuncConfig的策略创立根本实现了。

2.2.2 KisFlowConfig 定义

KisFlowConfig在设计文档中的yaml文件模式如下:

kistype: flowstatus: 1flow_name: MyFlow1flows:  - fname: 测试PrintInput    params:      args1: value1      args2: value2  - fname: 测试KisFunction_S1  - fname: 测试PrintInput    params:      args1: value11      args2: value22      default2: newDefault  - fname: 测试PrintInput  - fname: 测试KisFunction_S1    params:      my_user_param1: ffffffxxxxxx  - fname: 测试PrintInput

参数阐明:

A. 构造体定义

接下来咱们根据上述的配置协定,来定义KisFlow的策略配置构造体,并且提供一些响应的初始化办法。 咱们在我的项目文档中创立kis_flow_config.go文件,在这里咱们将须要的Config定义实现。

kis-flow/config/kis_flow_config.go
package configimport "kis-flow/common"// KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数type KisFlowFunctionParam struct {    FuncName string `yaml:"fname"`  //必须    Params   FParam `yaml:"params"` //选填,在以后Flow中Function定制固定配置参数}// KisFlowConfig 用户贯通整条流式计算上下文环境的对象type KisFlowConfig struct {    KisType  string                 `yaml:"kistype"`    Status   int                    `yaml:"status"`    FlowName string                 `yaml:"flow_name"`    Flows    []KisFlowFunctionParam `yaml:"flows"`}

这里提供了一个新的参数类型 KisFlowFunctionParam ,这个示意配置KisFlow的时候,在调度的时候,flow默认传递以后被调度Function的自定义默认参数,如果不须要能够不增加此参数。

B. 相干办法定义

提供一个新建KisFlowConfig的构造方法。

kis-flow/config/kis_flow_config.go
// NewFlowConfig 创立一个Flow策略配置对象, 用于形容一个KisFlow信息func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {    config := new(KisFlowConfig)    config.FlowName = flowName    config.Flows = make([]KisFlowFunctionParam, 0)    config.Status = int(enable)    return config}// AppendFunctionConfig 增加一个Function Config 到以后Flow中func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {    fConfig.Flows = append(fConfig.Flows, params)}

无关flow携带的Function配置,这里咱们采纳通过AppendFunctionConfig动静的去增加,目标是为了,今后可能无关kisflow的配置会从数据库/动静近程配置等中提取,那么就须要动静的将配置组合进来。

C. KisFlowConfig单元测试

同样,咱们简略些一个单元测试来测试KisFlowConfig的创立。

kis-flow/test/kis_config_test.go
func TestNewFlowConfig(t *testing.T) {    flowFuncParams1 := config.KisFlowFunctionParam{        FuncName: "funcName1",        Params: config.FParam{            "flowSetFunParam1": "value1",            "flowSetFunParam2": "value2",        },    }    flowFuncParams2 := config.KisFlowFunctionParam{        FuncName: "funcName2",        Params: config.FParam{            "default": "value1",        },    }    myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)    myFlow1.AppendFunctionConfig(flowFuncParams1)    myFlow1.AppendFunctionConfig(flowFuncParams2)    log.Logger().InfoF("myFlow1: %+v\n", myFlow1)}

咱们cdkis-flow/test/目录下执行单元测试指令:

$ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig

失去后果如下:

=== RUN   TestNewFlowConfigmyFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}--- PASS: TestNewFlowConfig (0.00s)PASSok      kis-flow/test   0.251s

2.2.3 KisConnConfig

KisConnConfig在设计文档中的yaml文件模式如下:

kistype: conncname: 测试KisConnector_1addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'type: rediskey: userid_orderid_optionparams:  args1: value1  args2: value2load: nullsave:  - 测试KisFunction_S1

A. 构造体定义

接下来咱们根据上述的配置协定,来定义KisConnector的策略配置构造体,并且提供一些响应的初始化办法。 咱们在我的项目文档中创立kis_conn_config.go文件,在这里咱们将须要的Config定义实现。

kis-flow/config/kis_conn_config.go
package configimport (    "errors"    "fmt"    "kis-flow/common")// KisConnConfig KisConnector 策略配置type KisConnConfig struct {    //配置类型    KisType string `yaml:"kistype"`    //惟一形容标识    CName string `yaml:"cname"`    //根底存储媒介地址    AddrString string `yaml:"addrs"`    //存储媒介引擎类型"Mysql" "Redis" "Kafka"等    Type common.KisConnType `yaml:"type"`    //一次存储的标识:如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等    Key string `yaml:"key"`    //配置信息中的自定义参数    Params map[string]string `yaml:"params"`    //存储读取所绑定的NsFuncionID    Load []string `yaml:"load"`    Save []string `yaml:"save"`}

B. 相干办法定义

kis-flow/config/kis_conn_config.go
// NewConnConfig 创立一个KisConnector策略配置对象, 用于形容一个KisConnector信息func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {    strategy := new(KisConnConfig)    strategy.CName = cName    strategy.AddrString = addr    strategy.Type = t    strategy.Key = key    strategy.Params = param    return strategy}// WithFunc Connector与Function进行关系绑定func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {    switch common.KisMode(fConfig.FMode) {    case common.S:        cConfig.Save = append(cConfig.Save, fConfig.FName)    case common.L:        cConfig.Load = append(cConfig.Load, fConfig.FName)    default:        return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))    }    return nil}

这里也是通过提供WithFunc办法来动静的增加Conn和Function的关联关系 ###

C. KisConnConfig 单元测试 同样,咱们简略些一个单元测试来测试KisConnConfig的创立。

kis-flow/test/kis_config_test.go
func TestNewConnConfig(t *testing.T) {    source := config.KisSource{        Name: "公众号抖音商城户订单数据",        Must: []string{"order_id", "user_id"},    }    option := config.KisFuncOption{        CName:        "connectorName1",        RetryTimes:   3,        RetryDuriton: 300,        Params: config.FParam{            "param1": "value1",            "param2": "value2",        },    }    myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)    connParams := config.FParam{        "param1": "value1",        "param2": "value2",    }    myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)    if err := myConnector1.WithFunc(myFunc1); err != nil {        log.Logger().ErrorF("WithFunc err: %s\n", err.Error())    }    log.Logger().InfoF("myConnector1: %+v\n", myConnector1)}

咱们cdkis-fow/test/目录下执行单元测试指令:

$ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig

失去后果如下:

=== RUN   TestNewConnConfigmyConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}--- PASS: TestNewConnConfig (0.00s)PASSok      kis-flow/test   0.481s

作者:刘丹冰Aceld github: https://github.com/aceld

KisFlow开源我的项目地址:https://github.com/aceld/kis-flow

连载中...
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-我的项目构建/根底模块-(上)