关于云原生:Dapr源码解析五分钟快速添加并启动新组件

31次阅读

共计 9089 个字符,预计需要花费 23 分钟才能阅读完成。

明天跟大家分享一个小教程,如何本人入手写一个新组件,并加载到 Dapr runtime。

后期筹备

  1. 对于 Dapr runtime 的启动过程,如何加载和实例化组件能够参考之前写的文章 Dapr runtime 启动过程
  2. Dapr 开发环境配置,能够参考 Dapr 开发环境配置

开始编码

克隆 Dapr 和 component-contrib

cd $GOPATH/src

# Clone dapr
mkdir -p github.com/dapr/dapr
git clone https://github.com/dapr/dapr.git github.com/dapr/dapr

# Clone component-contrib
mkdir -p github.com/dapr/components-contrib
git clone https://github.com/dapr/components-contrib.git github.com/dapr/components-contrib

编写组件

假如,咱们组件名字叫:Mytest
首先在 component-contrib 我的项目下,创立 mytest 包。

  1. 在 mytest 文件夹创立接口束缚文件:github.com/dapr/components-contrib/mytest/mytest.go
    mytest 组件提供两个办法,Init 和 Info
package mytest

type Mytest interface {
    // Init this component.
    Init(metadata Metadata)

    // Info show method
    Info(info string)
}
  1. metadata.go:接管组件配置文件 YAML 中定义的 Metadata 字段
package mytest

type Metadata struct {Properties map[string]string `json:"properties"`
}

3. 为 mytest 组件提供两种实现形式:demof 和 demos
在 mytest 包里创立 demof 包和 demos 包,别离在 demof.go 和 demos.go 实现 Mytest 组件
demof.go

package demof

import (
    "github.com/dapr/components-contrib/mytest"
    "github.com/dapr/kit/logger"
)

type Demof struct {logger logger.Logger}

func NewDemof(logger logger.Logger) *Demof {
    d := &Demof{logger: logger,}

    return d
}

func (d *Demof) Init(metadata mytest.Metadata) {d.logger.Info(metadata)
}

func (d *Demof) Info(info string) {d.logger.Info("this is Demof, I received %s", info)
}

demos.go

package demos

import (
    "github.com/dapr/components-contrib/mytest"
    "github.com/dapr/kit/logger"
)

type Demos struct {logger logger.Logger}

func NewDemos(logger logger.Logger) *Demos {
    d := &Demos{logger: logger,}

    return d
}

func (d *Demos) Init(metadata mytest.Metadata) {d.logger.Info(metadata)
}

func (d *Demos) Info(info string) {d.logger.Info("this is Demos, I received %s", info)
}

至此 Mytest 组件曾经具备了能够执行的所有因素,目录解构如下

component-contrib
    |_mytest
        |_demof
            |_demof.go
        |_demos
            |_demos.go
        metadata.go
        mytest.go

将 Mytest 组件注册到 Dapr runtime

  1. 实现 Mytest 组件注册接口github.com/dapr/dapr/pkg/components/mytest/registry.go
package mytest

import (
    "strings"

    "github.com/pkg/errors"

    "github.com/dapr/components-contrib/mytest"
    "github.com/dapr/dapr/pkg/components"
)

type Mytest struct {
    Name          string
    FactoryMethod func() mytest.Mytest}

func New(name string, factoryMethod func() mytest.Mytest) Mytest {
    return Mytest{
        Name:          name,
        FactoryMethod: factoryMethod,
    }
}

type Registry interface {Register(components ...Mytest)
    Create(name, version string) (mytest.Mytest, error)
}

type mytestRegistry struct {mytests map[string]func() mytest.Mytest}

func NewRegistry() Registry {
    return &mytestRegistry{mytests: map[string]func() mytest.Mytest{},
    }
}

func (t *mytestRegistry) Register(components ...Mytest) {
    for _, component := range components {t.mytests[createFullName(component.Name)] = component.FactoryMethod
    }
}

func (t *mytestRegistry) Create(name, version string) (mytest.Mytest, error) {if method, ok := t.getMytest(name, version); ok {return method(), nil
    }
    return nil, errors.Errorf("couldn't find Mytest %s/%s", name, version)
}

func (t *mytestRegistry) getMytest(name, version string) (func() mytest.Mytest, bool) {nameLower := strings.ToLower(name)
    versionLower := strings.ToLower(version)
    mytestFn, ok := t.mytests[nameLower+"/"+versionLower]
    if ok {return mytestFn, true}
    if components.IsInitialVersion(versionLower) {mytestFn, ok = t.mytests[nameLower]
    }
    return mytestFn, ok
}

func createFullName(name string) string {return strings.ToLower("mytest." + name)
}

2. 更新 runtime 组件发现和注册机制
github.com/dapr/dapr/pkg/runtime/options.go
扩大 runtimeOpts

import("github.com/dapr/dapr/pkg/components/mytest")
runtimeOpts struct {
    ...
    mytests     []mytest.Mytest}

增加 runtime 组件注册函数

func WithMytests(mytests ...mytest.Mytest) Option {return func(o *runtimeOpts) {o.mytests = append(o.mytests, mytests...)
    }
}

更新 runtime 启动流程,注册 Mytest 组件
github.com/dapr/dapr/pkg/runtime/runtime.go

import(
....
"github.com/dapr/components-contrib/mytest"
    mytest_loader "github.com/dapr/dapr/pkg/components/mytest"
)

...
// 更新变量申明
const (mytestComponent             ComponentCategory = "mytest")
var componentCategoriesNeedProcess = []ComponentCategory{
    ...
    mytestComponent,
}
...
// 扩大 DaprRuntime  components of the runtime.
type DaprRuntime struct {
    ...
    mytestRegistry mytest_loader.Registry
    mytests        map[string]mytest.Mytest
}

// NewDaprRuntime returns a new runtime with the given runtime config and global config.
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration, accessControlList *config.AccessControlList, resiliencyProvider resiliency.Provider) *DaprRuntime {ctx, cancel := context.WithCancel(context.Background())
    return &DaprRuntime{
        ...
        mytestRegistry: mytest_loader.NewRegistry(),
        mytests:        map[string]mytest.Mytest{},}
}

...
// 初始化 runtime
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    ...
    a.mytestRegistry.Register(opts.mytests...)
    ...
}

// 本示例仅以 http api 接口为例
func (a *DaprRuntime) startHTTPServer(port int, publicPort *int, profilePort int, allowedOrigins string, pipeline http_middleware.Pipeline) error {
    a.daprHTTPAPI = http.NewAPI(a.runtimeConfig.ID,
        a.appChannel,
        a.directMessaging,
        a.getComponents,
        a.resiliency,
        a.stateStores,
        a.lockStores,
        a.secretStores,
        a.secretsConfiguration,
        a.configurationStores,
        a.getPublishAdapter(),
        a.actor,
        a.sendToOutputBinding,
        a.globalConfig.Spec.TracingSpec,
        a.ShutdownWithWait,
        a.getComponentsCapabilitesMap,
        a.mytests,
    )
...
}

//runtime 初始化过程从 components 目录 发现并初始化组件
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
    switch category {
    ...
    case mytestComponent:
        return a.initMytest(comp)
    }
    return nil
}

func (a *DaprRuntime) initMytest(c components_v1alpha1.Component) error {mytestIns, err := a.mytestRegistry.Create(c.Spec.Type, c.Spec.Version)
    if err != nil {log.Errorf("error create component %s: %s", c.ObjectMeta.Name, err)
    }
    a.mytests[c.ObjectMeta.Name] = mytestIns
    properties := a.convertMetadataItemsToProperties(c.Spec.Metadata)
    log.Debug("properties is", properties)
    mytestIns.Init(mytest.Metadata{Properties: properties,})
    return err
}

实现 Dapr http api 调用接口

github.com/dapr/dapr/pkg/http/api.go

import ("github.com/dapr/components-contrib/mytest")

type api struct {
    ....
    mytests              map[string]mytest.Mytest
}

const (
    ...
    mytestParam          = "mytestName"
)

// NewAPI returns a new API.
func NewAPI(
    ...
    mytests map[string]mytest.Mytest,
) API {
    ...
    api := &api{
        ...
        mytests:              mytests,
    }
    ...
    api.endpoints = append(api.endpoints, api.constructMytestEndpoints()...)

    return api
}
/**
 * regist Mytest component api
 */
func (a *api) constructMytestEndpoints() []Endpoint {return []Endpoint{
        {Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost},
            Route:   "mytest/{mytestName}/info",
            Version: apiVersionV1,
            Handler: a.onMytestInfo,
        },
    }
}

/**
 * switch Mytest component instance
 */
func (a *api) getMytestWithRequestValidation(reqCtx *fasthttp.RequestCtx) (mytest.Mytest, string, error) {if a.mytests == nil || len(a.mytests) == 0 {msg := NewErrorResponse("ERR_MYTEST_NOT_CONFIGURED", messages.ErrMytestNotFound)
        respond(reqCtx, withError(fasthttp.StatusInternalServerError, msg))
        log.Debug(msg)
        return nil, "", errors.New(msg.Message)
    }

    mytestName := a.getMytestName(reqCtx)
    if a.mytests[mytestName] == nil {msg := NewErrorResponse("ERR_MYTEST_NOT_CONFIGURED", fmt.Sprintf(messages.ErrMytestNotFound, mytestName))
        respond(reqCtx, withError(fasthttp.StatusBadRequest, msg))
        log.Debug(msg)
        return nil, "", errors.New(msg.Message)
    }
    return a.mytests[mytestName], mytestName, nil
}

func (a *api) getMytestName(reqCtx *fasthttp.RequestCtx) string {return reqCtx.UserValue(mytestParam).(string)
}

func (a *api) onMytestInfo(reqCtx *fasthttp.RequestCtx) {log.Debug("calling mytest components")
    mytestInstance, _, err := a.getMytestWithRequestValidation(reqCtx)
    if err != nil {log.Debug(err)
        return
    }

    mytestInstance.Info()
    respond(reqCtx, withEmpty())
}

Dapr runtime 初始化入口

github.com/dapr/dapr/cmd/daprd/main.go

import (
    //mytest demo
    "github.com/dapr/components-contrib/mytest"
    mytest_demof "github.com/dapr/components-contrib/mytest/demof"
    mytest_demos "github.com/dapr/components-contrib/mytest/demos"
    mytest_loader "github.com/dapr/dapr/pkg/components/mytest"
)
...

runtime.WithSecretStores(
...
        runtime.WithMytests(mytest_loader.New("demof", func() mytest.Mytest {return mytest_demof.NewDemof(logContrib)
            }),
            mytest_loader.New("demos", func() mytest.Mytest {return mytest_demos.NewDemos(logContrib)
            }),
        ),
...
)
....

在 component 目录中增加 mytest.yaml 配置文件

默认是在 .dapr/components 目录

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: demof
spec:
  type: mytest.demof
  version: v1
  metadata:
  - name: mName
    value: mValue

编译并运行 Dapr

go mod edit -replace github.com/dapr/components-contrib=../components-contrib

go mod tidy

make DEBUG=1 build

# Back up the current daprd
cp ~/.dapr/bin/daprd ~/.dapr/bin/daprd.bak
cp ./dist/darwin_amd64/debug/daprd ~/.dapr/bin

dapr run --app-id myapp --dapr-http-port 3500  --log-level debug

察看 Dapr 启动日志

DEBU[0000] loading component. name: demof, type: mytest.demof/v1  app_id=myapp instance=MacBook-Pro-3.local scope=dapr.runtime type=log ver=edge
INFO[0000] component loaded. name: demof, type: mytest.demof/v1  app_id=myapp instance=MacBook-Pro-3.local scope=dapr.runtime type=log ver=edge
INFO[0000] {map[mName:mValue]}                           app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge

demof 组件曾经加载胜利,接下来应用 http api 接口尝试调用 demof info 接口

curl http://localhost:3500/v1.0/mytest/demof/info

日志记录到了咱们在组件中打印的信息

INFO[0126] this is Demof, I'm working                    app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge

批改一下 mytest.yaml 配置文件,实例化 demos

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: demos
spec:
  type: mytest.demos
  version: v1
  metadata:
  - name: mName
    value: mValue

尝试调用 demos info 接口

curl http://localhost:3500/v1.0/mytest/demos/info
INFO[0058] this is Demos, I'm working                    app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge

组件失常工作。

以上示例代码曾经放在 github 上
https://github.com/RcXu/dapr.git
https://github.com/RcXu/components-contrib.git
分支名为:dev-demo

多谢浏览,敬请斧正!

正文完
 0