关于go:Golang框架实战KisFlow流式计算框架2项目构建基础模块上

48次阅读

共计 12640 个字符,预计需要花费 32 分钟才能阅读完成。

2. V0.1- 我的项目构建及根底模块定义

首先咱们创立咱们的我的项目,我的项目的主文件目录就叫 KisFlow,且在 Github 上创立对应的仓库:https://github.com/aceld/kis-flow 而后将我的项目代码 clone 到本地。

2.0 我的项目构建

(这里如果你是依照本教程开发,须要在本人的仓库从新创立一个新我的项目,并且 clone 到本地开发)

2.0.1 创立我的项目目录

接下来,咱们先将我的项目中的必要的文件目录创立好,我的项目的目录构造如下:

 kis-flow /
.
├── LICENSE
├── README.md
├── common/
├── example/
├── function/
├── conn/
├── config/
├── flow/
└── kis/

这里咱们创立三个文件夹,common/为 寄存咱们一些专用的根底常量和一些枚举参数,还有一些工具类的办法。flow/为寄存 KisFlow 的外围代码。function/为寄存 KisFunction 的外围代码。conn/为寄存 KisConnector 的外围代码。config/ 寄存 flow、functioin、connector 等策略配置信息模块。example/为咱们针对 KisFlow 的一些测试案例和 test 单元测试案例等,可能及时验证咱们的我的项目成果。kis/来寄存所有模块的形象层。

2.0.1 创立 go.mod

cd 到 kis-flow 的我的项目根目录,执行如下指令:

咱们会失去 go.mod 文件,这个是作为以后我的项目的包管理文件,如下:

module kis-flow
go 1.18

首先因为在之后会有很多调试日志要打印,咱们先把日志模块集成了,日志模块 KisFlow 提供一个默认的规范输入 Logger 对象,再对我凋谢一个 SetLogger() 办法来进行从新设置开发者本人的 Logger 模块。

2.1 KisLogger

2.1.1 Logger 形象接口

将 Logger 的定义在 kis-flow/log/ 目录下,创立 kis_log.go 文件:

kis-flow/log/kis_log.go

package log

import "context"

type KisLogger interface {
    // InfoFX 有上下文的 Info 级别日志接口, format 字符串格局
    InfoFX(ctx context.Context, str string, v ...interface{})
    // ErrorFX 有上下文的 Error 级别日志接口, format 字符串格局
    ErrorFX(ctx context.Context, str string, v ...interface{})
    // DebugFX 有上下文的 Debug 级别日志接口, format 字符串格局
    DebugFX(ctx context.Context, str string, v ...interface{})

    // InfoF 无上下文的 Info 级别日志接口, format 字符串格局
    InfoF(str string, v ...interface{})
    // ErrorF 无上下文的 Error 级别日志接口, format 字符串格局
    ErrorF(str string, v ...interface{})
    // DebugF 无上下文的 Debug 级别日志接口, format 字符串格局
    DebugF(str string, v ...interface{})
}

// kisLog 默认的 KisLog 对象
var kisLog KisLogger

// SetLogger 设置 KisLog 对象, 能够是用户自定义的 Logger 对象
func SetLogger(newlog KisLogger) {kisLog = newlog}

// Logger 获取到 kisLog 对象
func Logger() KisLogger {return kisLog}

KisLogger 提供了三个级别的日志,别离是 Info、Error、Debug。且也别离提供了具备 context 参数与不具备 context 参数的两套日志接口。
提供一个全局对象 kisLog,默认的 KisLog 对象。以及办法SetLogger()Logger()供开发能够设置本人的 Logger 对象以及获取到 Logger 对象。

2.1.2 默认的日志对象 KisDefaultLogger

如果开发没有自定义的日志对象定义,那么 KisFlow 会提供一个默认的日志对象 kisDefaultLogger,这个类实现了KisLogger 的全副接口,且都是默认打印到规范输入的模式来打印日志,定义在 kis-flow/log/ 目录下,创立 kis_default_log.go 文件。

kis-flow/log/kis_default_log.go

package log

import (
    "context"
    "fmt"
)

// kisDefaultLog 默认提供的日志对象
type kisDefaultLog struct{}

func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {fmt.Printf(str, v...)
}

func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {fmt.Println(ctx)
    fmt.Printf(str, v...)
}

func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {fmt.Println(ctx)
    fmt.Printf(str, v...)
}

func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {fmt.Println(ctx)
    fmt.Printf(str, v...)
}

func init() {
    // 如果没有设置 Logger, 则启动时应用默认的 kisDefaultLog 对象
    if Logger() == nil {SetLogger(&kisDefaultLog{})
    }
}

这里在 init() 初始化办法中,会判断目前是否曾经有设置全局的 Logger 对象,如果没有,KisFlow 会默认抉择 kisDefaultLog 作为全局 Logger 日志对象。

2.1.3 单元测试 KisLogger

当初,咱们先不针对 KisLogger 做过多的办法开发,咱们优先将现有的程序跑起来,做一个单元测试来测试创立一个KisLogger

kis-flow/test/kis_log_test.go

package test

import (
    "context"
    "kis-flow/log"
    "testing"
)

func TestKisLogger(t *testing.T) {ctx := context.Background()

    log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
    log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
    log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")

    log.Logger().InfoF("TestKisLogger InfoF")
    log.Logger().ErrorF("TestKisLogger ErrorF")
    log.Logger().DebugF("TestKisLogger DebugF")
}

咱们 cdkis-flow/test/目录下执行单元测试指令:

go test -test.v -test.paniconexit0 -test.run TestKisLogger

失去后果如下:

=== RUN   TestKisLogger
context.Background
TestKisLogger InfoFX
context.Background
TestKisLogger ErrorFX
context.Background
TestKisLogger DebugFX
TestKisLogger InfoF
TestKisLogger ErrorF
TestKisLogger DebugF
--- PASS: TestKisLogger (0.00s)
PASS
ok      kis-flow/test   0.509s

2.2 KisConfig

在 KisFlow 中,咱们定义了三种外围模块,别离是 KisFunction, KisFlow, KisConnector,所以 KisConfig 也别离须要针对这三个模块进行定义,咱们将全副无关 KisConfig 的代码都放在kis-flow/config/ 目录下。

➜  kis-flow git:(master) ✗ tree
.
├── LICENSE
├── README.md
├── common/
│   └── 
├── example/
│   └── 
├── config/
│   ├──
├── test/
└── go.mod

2.2.1 KisFuncConfig 定义

KisFuncConfig 在设计文档中的 yaml 文件模式如下:

kistype: func
fname: 测试 KisFunction_S1
fmode: Save
source:
 name: 被校验的测试数据源 1 - 用户订单维度
 must:
 - userid
 - orderid

option:
 cname: 测试 KisConnector_1
 retry_times: 3
 retry_duration: 500
 default_params:
 default1: default1_param
 default2: default2_param

参数阐明:

接下来咱们根据上述的配置协定,来定义 KisFunction 的策略配置构造体,并且提供一些响应的初始化办法。咱们在我的项目文档中创立 kis_func_config.go 文件,在这里咱们将须要的 Config 定义实现。

A. 构造体定义

kis-flow/config/kis_func_config.go

package config

import (
    "kis-flow/common"
    "kis-flow/log"
)

// FParam 在以后 Flow 中 Function 定制固定配置参数类型
type FParam map[string]string

// KisSource 示意以后 Function 的业务源
type KisSource struct {
    Name string   `yaml:"name"` // 本层 Function 的数据源形容
    Must []string `yaml:"must"` //source 必传字段}

// KisFuncOption 可选配置
type KisFuncOption struct {
    CName        string `yaml:"cname"`           // 连接器 Connector 名称
    RetryTimes   int    `yaml:"retry_times"`     // 选填,Function 调度重试 (不包含失常调度) 最大次数
    RetryDuriton int    `yaml:"return_duration"` // 选填,Function 调度每次重试最大工夫距离(单位:ms)
    Params       FParam `yaml:"default_params"`  // 选填, 在以后 Flow 中 Function 定制固定配置参数
}

// KisFuncConfig 一个 KisFunction 策略配置
type KisFuncConfig struct {
    KisType string        `yaml:"kistype"`
    FName   string        `yaml:"fname"`
    FMode   string        `yaml:"fmode"`
    Source  KisSource     `yaml:"source"`
    Option  KisFuncOption `yaml:"option"`
}

这里 KisFuncConfig 是相干构造体,其中 FParamKisSourceKisFuncOption均为一些相干的参数类型。

B. 相干办法定义

上面咱们先简略的提供创立 KisFuncConfig 的构造方法。

kis-flow/config/kis_func_config.go

// NewFuncConfig 创立一个 Function 策略配置对象, 用于形容一个 KisFunction 信息
func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {config := new(KisFuncConfig)
     config.FName = funcName

     if source == nil {log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)
         return nil
     }

     config.Source = *source
     config.FMode = string(mode)

     //FunctionS 和 L 须要必传 KisConnector 参数, 起因是 S 和 L 须要通过 Connector 进行建设流式关系
     if mode == common.S || mode == common.L {
             if option == nil {log.Logger().ErrorF("Funcion S/L need option->Cid\n")
                   return nil
             } else if option.CName == "" {log.Logger().ErrorF("Funcion S/L need option->Cid\n")
                   return nil
             }
       }

      if option != nil {config.Option = *option}

     return config
}

上述代码中提到了 common.Scommon.L两个枚举类型,这是咱们针对 KisFunction 提供的五种类型的枚举值,咱们能够将他们定义在 kis-flow/common/const.go文件中。

kis-flow/common/const.go

package common

type KisMode string

const (
    // V 为校验特色的 KisFunction, 
    // 次要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
    V KisMode = "Verify"

    // S 为存储特色的 KisFunction, 
    // S 会通过 NsConnector 进行将数据进行存储,数据的长期申明周期为 NsWindow
    S KisMode = "Save"

    // L 为加载特色的 KisFunction,// L 会通过 KisConnector 进行数据加载,通过该 Function 能够从逻辑上与对应的 S Function 进行并流
    L KisMode = "Load"

    // C 为计算特色的 KisFunction, 
    // C 会通过 KisFlow 中的数据计算,生成新的字段,将数据流传递给上游 S 进行存储,或者本人也已间接通过 KisConnector 进行存储
    C KisMode = "Calculate"

    // E 为扩大特色的 KisFunction,// 作为流式计算的自定义特色 Function,如,Notify 调度器触发工作的音讯发送,删除一些数据,重置状态等。E KisMode = "Expand"
)

如果 fmodeSave或者 Load 阐明这个 function 有查问库或者存储数据的行为,那么这个 Function 就须要关联一个 KisConnector,那么 CName 就须要传递进来。

C. 创立 KisFuncConfig 单元测试

当初,咱们先不针对 KisFuncConfig 做过多的办法开发,咱们优先将现有的程序跑起来,做一个单元测试来测试创立一个KisFuncConfig

kis-flow/test/kis_config_test.go

func TestNewFuncConfig(t *testing.T) {
    source := config.KisSource{
        Name: "公众号抖音商城户订单数据",
        Must: []string{"order_id", "user_id"},
    }

    option := config.KisFuncOption{
        CName:        "connectorName1",
        RetryTimes:   3,
        RetryDuriton: 300,

        Params: config.FParam{
            "param1": "value1",
            "param2": "value2",
        },
    }

    myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)

    log.Logger().InfoF("funcName1: %+v\n", myFunc1)
}

咱们 cdkis-flow/test/目录下执行单元测试指令:

go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig

失去后果如下:

=== RUN   TestNewFuncConfig
funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name: 公众号抖音商城户订单数据 Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}

--- PASS: TestNewFuncConfig (0.00s)
PASS
ok      kis-flow/test   0.545s

好了,当初最简略的 KisFuncConfig 的策略创立根本实现了。

2.2.2 KisFlowConfig 定义

KisFlowConfig 在设计文档中的 yaml 文件模式如下:

kistype: flow
status: 1
flow_name: MyFlow1
flows:
  - fname: 测试 PrintInput
    params:
      args1: value1
      args2: value2
  - fname: 测试 KisFunction_S1
  - fname: 测试 PrintInput
    params:
      args1: value11
      args2: value22
      default2: newDefault
  - fname: 测试 PrintInput
  - fname: 测试 KisFunction_S1
    params:
      my_user_param1: ffffffxxxxxx
  - fname: 测试 PrintInput

参数阐明:

A. 构造体定义

接下来咱们根据上述的配置协定,来定义 KisFlow 的策略配置构造体,并且提供一些响应的初始化办法。咱们在我的项目文档中创立 kis_flow_config.go 文件,在这里咱们将须要的 Config 定义实现。

kis-flow/config/kis_flow_config.go

package config

import "kis-flow/common"

// KisFlowFunctionParam 一个 Flow 配置中 Function 的 Id 及携带固定配置参数
type KisFlowFunctionParam struct {
    FuncName string `yaml:"fname"`  // 必须
    Params   FParam `yaml:"params"` // 选填, 在以后 Flow 中 Function 定制固定配置参数
}

// KisFlowConfig 用户贯通整条流式计算上下文环境的对象
type KisFlowConfig struct {
    KisType  string                 `yaml:"kistype"`
    Status   int                    `yaml:"status"`
    FlowName string                 `yaml:"flow_name"`
    Flows    []KisFlowFunctionParam `yaml:"flows"`}

这里提供了一个新的参数类型 KisFlowFunctionParam,这个示意配置 KisFlow 的时候,在调度的时候,flow 默认传递以后被调度 Function 的自定义默认参数,如果不须要能够不增加此参数。

B. 相干办法定义

提供一个新建 KisFlowConfig 的构造方法。

kis-flow/config/kis_flow_config.go

// NewFlowConfig 创立一个 Flow 策略配置对象, 用于形容一个 KisFlow 信息
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {config := new(KisFlowConfig)
    config.FlowName = flowName
    config.Flows = make([]KisFlowFunctionParam, 0)

    config.Status = int(enable)

    return config
}

// AppendFunctionConfig 增加一个 Function Config 到以后 Flow 中
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {fConfig.Flows = append(fConfig.Flows, params)
}

无关 flow 携带的 Function 配置,这里咱们采纳通过 AppendFunctionConfig 动静的去增加,目标是为了,今后可能无关 kisflow 的配置会从数据库 / 动静近程配置等中提取,那么就须要动静的将配置组合进来。

C. KisFlowConfig 单元测试

同样,咱们简略些一个单元测试来测试 KisFlowConfig 的创立。

kis-flow/test/kis_config_test.go


func TestNewFlowConfig(t *testing.T) {

    flowFuncParams1 := config.KisFlowFunctionParam{
        FuncName: "funcName1",
        Params: config.FParam{
            "flowSetFunParam1": "value1",
            "flowSetFunParam2": "value2",
        },
    }

    flowFuncParams2 := config.KisFlowFunctionParam{
        FuncName: "funcName2",
        Params: config.FParam{"default": "value1",},
    }

    myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)
    myFlow1.AppendFunctionConfig(flowFuncParams1)
    myFlow1.AppendFunctionConfig(flowFuncParams2)

    log.Logger().InfoF("myFlow1: %+v\n", myFlow1)
}

咱们 cdkis-flow/test/目录下执行单元测试指令:

$ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig

失去后果如下:

=== RUN   TestNewFlowConfig
myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}

--- PASS: TestNewFlowConfig (0.00s)
PASS
ok      kis-flow/test   0.251s

2.2.3 KisConnConfig

KisConnConfig 在设计文档中的 yaml 文件模式如下:

kistype: conn
cname: 测试 KisConnector_1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: userid_orderid_option
params:
  args1: value1
  args2: value2
load: null
save:
  - 测试 KisFunction_S1

A. 构造体定义

接下来咱们根据上述的配置协定,来定义 KisConnector 的策略配置构造体,并且提供一些响应的初始化办法。咱们在我的项目文档中创立 kis_conn_config.go 文件,在这里咱们将须要的 Config 定义实现。

kis-flow/config/kis_conn_config.go

package config

import (
    "errors"
    "fmt"
    "kis-flow/common"
)

// KisConnConfig KisConnector 策略配置
type KisConnConfig struct {
    // 配置类型
    KisType string `yaml:"kistype"`
    // 惟一形容标识
    CName string `yaml:"cname"`
    // 根底存储媒介地址
    AddrString string `yaml:"addrs"`
    // 存储媒介引擎类型 "Mysql" "Redis" "Kafka" 等
    Type common.KisConnType `yaml:"type"`
    // 一次存储的标识:如 Redis 为 Key 名称、Mysql 为 Table 名称,Kafka 为 Topic 名称等
    Key string `yaml:"key"`
    // 配置信息中的自定义参数
    Params map[string]string `yaml:"params"`
    // 存储读取所绑定的 NsFuncionID
    Load []string `yaml:"load"`
    Save []string `yaml:"save"`}

B. 相干办法定义

kis-flow/config/kis_conn_config.go

// NewConnConfig 创立一个 KisConnector 策略配置对象, 用于形容一个 KisConnector 信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {strategy := new(KisConnConfig)
    strategy.CName = cName
    strategy.AddrString = addr

    strategy.Type = t
    strategy.Key = key
    strategy.Params = param

    return strategy
}

// WithFunc Connector 与 Function 进行关系绑定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {switch common.KisMode(fConfig.FMode) {
    case common.S:
        cConfig.Save = append(cConfig.Save, fConfig.FName)
    case common.L:
        cConfig.Load = append(cConfig.Load, fConfig.FName)
    default:
        return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
    }

    return nil
}

这里也是通过提供 WithFunc 办法来动静的增加 Conn 和 Function 的关联关系 ###

C. KisConnConfig 单元测试 同样,咱们简略些一个单元测试来测试 KisConnConfig 的创立。

kis-flow/test/kis_config_test.go

func TestNewConnConfig(t *testing.T) {

    source := config.KisSource{
        Name: "公众号抖音商城户订单数据",
        Must: []string{"order_id", "user_id"},
    }

    option := config.KisFuncOption{
        CName:        "connectorName1",
        RetryTimes:   3,
        RetryDuriton: 300,

        Params: config.FParam{
            "param1": "value1",
            "param2": "value2",
        },
    }

    myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)

    connParams := config.FParam{
        "param1": "value1",
        "param2": "value2",
    }

    myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)

    if err := myConnector1.WithFunc(myFunc1); err != nil {log.Logger().ErrorF("WithFunc err: %s\n", err.Error())
    }

    log.Logger().InfoF("myConnector1: %+v\n", myConnector1)
}

咱们 cdkis-fow/test/目录下执行单元测试指令:

$ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig

失去后果如下:

=== RUN   TestNewConnConfig
myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}

--- PASS: TestNewConnConfig (0.00s)
PASS
ok      kis-flow/test   0.481s

作者: 刘丹冰 Aceld github: https://github.com/aceld

KisFlow 开源我的项目地址:https://github.com/aceld/kis-flow

连载中 …
Golang 框架实战 -KisFlow 流式计算框架(1)- 概述
Golang 框架实战 -KisFlow 流式计算框架(2)- 我的项目构建 / 根底模块 -(上)

正文完
 0