共计 9089 个字符,预计需要花费 23 分钟才能阅读完成。
明天跟大家分享一个小教程,如何本人入手写一个新组件,并加载到 Dapr runtime。
后期筹备
- 对于 Dapr runtime 的启动过程,如何加载和实例化组件能够参考之前写的文章 Dapr runtime 启动过程
- Dapr 开发环境配置,能够参考 Dapr 开发环境配置
开始编码
克隆 Dapr 和 component-contrib
cd $GOPATH/src
# Clone dapr
mkdir -p github.com/dapr/dapr
git clone https://github.com/dapr/dapr.git github.com/dapr/dapr
# Clone component-contrib
mkdir -p github.com/dapr/components-contrib
git clone https://github.com/dapr/components-contrib.git github.com/dapr/components-contrib
编写组件
假如,咱们组件名字叫:Mytest
首先在 component-contrib
我的项目下,创立 mytest 包。
- 在 mytest 文件夹创立接口束缚文件:
github.com/dapr/components-contrib/mytest/mytest.go
mytest 组件提供两个办法,Init 和 Info
package mytest
type Mytest interface {
// Init this component.
Init(metadata Metadata)
// Info show method
Info(info string)
}
metadata.go
:接管组件配置文件 YAML 中定义的 Metadata 字段
package mytest
type Metadata struct {Properties map[string]string `json:"properties"`
}
3. 为 mytest 组件提供两种实现形式:demof 和 demos
在 mytest 包里创立 demof 包和 demos 包,别离在 demof.go 和 demos.go 实现 Mytest 组件demof.go
package demof
import (
"github.com/dapr/components-contrib/mytest"
"github.com/dapr/kit/logger"
)
type Demof struct {logger logger.Logger}
func NewDemof(logger logger.Logger) *Demof {
d := &Demof{logger: logger,}
return d
}
func (d *Demof) Init(metadata mytest.Metadata) {d.logger.Info(metadata)
}
func (d *Demof) Info(info string) {d.logger.Info("this is Demof, I received %s", info)
}
demos.go
package demos
import (
"github.com/dapr/components-contrib/mytest"
"github.com/dapr/kit/logger"
)
type Demos struct {logger logger.Logger}
func NewDemos(logger logger.Logger) *Demos {
d := &Demos{logger: logger,}
return d
}
func (d *Demos) Init(metadata mytest.Metadata) {d.logger.Info(metadata)
}
func (d *Demos) Info(info string) {d.logger.Info("this is Demos, I received %s", info)
}
至此 Mytest 组件曾经具备了能够执行的所有因素,目录解构如下
component-contrib
|_mytest
|_demof
|_demof.go
|_demos
|_demos.go
metadata.go
mytest.go
将 Mytest 组件注册到 Dapr runtime
- 实现 Mytest 组件注册接口
github.com/dapr/dapr/pkg/components/mytest/registry.go
package mytest
import (
"strings"
"github.com/pkg/errors"
"github.com/dapr/components-contrib/mytest"
"github.com/dapr/dapr/pkg/components"
)
type Mytest struct {
Name string
FactoryMethod func() mytest.Mytest}
func New(name string, factoryMethod func() mytest.Mytest) Mytest {
return Mytest{
Name: name,
FactoryMethod: factoryMethod,
}
}
type Registry interface {Register(components ...Mytest)
Create(name, version string) (mytest.Mytest, error)
}
type mytestRegistry struct {mytests map[string]func() mytest.Mytest}
func NewRegistry() Registry {
return &mytestRegistry{mytests: map[string]func() mytest.Mytest{},
}
}
func (t *mytestRegistry) Register(components ...Mytest) {
for _, component := range components {t.mytests[createFullName(component.Name)] = component.FactoryMethod
}
}
func (t *mytestRegistry) Create(name, version string) (mytest.Mytest, error) {if method, ok := t.getMytest(name, version); ok {return method(), nil
}
return nil, errors.Errorf("couldn't find Mytest %s/%s", name, version)
}
func (t *mytestRegistry) getMytest(name, version string) (func() mytest.Mytest, bool) {nameLower := strings.ToLower(name)
versionLower := strings.ToLower(version)
mytestFn, ok := t.mytests[nameLower+"/"+versionLower]
if ok {return mytestFn, true}
if components.IsInitialVersion(versionLower) {mytestFn, ok = t.mytests[nameLower]
}
return mytestFn, ok
}
func createFullName(name string) string {return strings.ToLower("mytest." + name)
}
2. 更新 runtime 组件发现和注册机制 github.com/dapr/dapr/pkg/runtime/options.go
扩大 runtimeOpts
import("github.com/dapr/dapr/pkg/components/mytest")
runtimeOpts struct {
...
mytests []mytest.Mytest}
增加 runtime 组件注册函数
func WithMytests(mytests ...mytest.Mytest) Option {return func(o *runtimeOpts) {o.mytests = append(o.mytests, mytests...)
}
}
更新 runtime 启动流程,注册 Mytest 组件github.com/dapr/dapr/pkg/runtime/runtime.go
import(
....
"github.com/dapr/components-contrib/mytest"
mytest_loader "github.com/dapr/dapr/pkg/components/mytest"
)
...
// 更新变量申明
const (mytestComponent ComponentCategory = "mytest")
var componentCategoriesNeedProcess = []ComponentCategory{
...
mytestComponent,
}
...
// 扩大 DaprRuntime components of the runtime.
type DaprRuntime struct {
...
mytestRegistry mytest_loader.Registry
mytests map[string]mytest.Mytest
}
// NewDaprRuntime returns a new runtime with the given runtime config and global config.
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration, accessControlList *config.AccessControlList, resiliencyProvider resiliency.Provider) *DaprRuntime {ctx, cancel := context.WithCancel(context.Background())
return &DaprRuntime{
...
mytestRegistry: mytest_loader.NewRegistry(),
mytests: map[string]mytest.Mytest{},}
}
...
// 初始化 runtime
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
...
a.mytestRegistry.Register(opts.mytests...)
...
}
// 本示例仅以 http api 接口为例
func (a *DaprRuntime) startHTTPServer(port int, publicPort *int, profilePort int, allowedOrigins string, pipeline http_middleware.Pipeline) error {
a.daprHTTPAPI = http.NewAPI(a.runtimeConfig.ID,
a.appChannel,
a.directMessaging,
a.getComponents,
a.resiliency,
a.stateStores,
a.lockStores,
a.secretStores,
a.secretsConfiguration,
a.configurationStores,
a.getPublishAdapter(),
a.actor,
a.sendToOutputBinding,
a.globalConfig.Spec.TracingSpec,
a.ShutdownWithWait,
a.getComponentsCapabilitesMap,
a.mytests,
)
...
}
//runtime 初始化过程从 components 目录 发现并初始化组件
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
switch category {
...
case mytestComponent:
return a.initMytest(comp)
}
return nil
}
func (a *DaprRuntime) initMytest(c components_v1alpha1.Component) error {mytestIns, err := a.mytestRegistry.Create(c.Spec.Type, c.Spec.Version)
if err != nil {log.Errorf("error create component %s: %s", c.ObjectMeta.Name, err)
}
a.mytests[c.ObjectMeta.Name] = mytestIns
properties := a.convertMetadataItemsToProperties(c.Spec.Metadata)
log.Debug("properties is", properties)
mytestIns.Init(mytest.Metadata{Properties: properties,})
return err
}
实现 Dapr http api 调用接口
github.com/dapr/dapr/pkg/http/api.go
import ("github.com/dapr/components-contrib/mytest")
type api struct {
....
mytests map[string]mytest.Mytest
}
const (
...
mytestParam = "mytestName"
)
// NewAPI returns a new API.
func NewAPI(
...
mytests map[string]mytest.Mytest,
) API {
...
api := &api{
...
mytests: mytests,
}
...
api.endpoints = append(api.endpoints, api.constructMytestEndpoints()...)
return api
}
/**
* regist Mytest component api
*/
func (a *api) constructMytestEndpoints() []Endpoint {return []Endpoint{
{Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost},
Route: "mytest/{mytestName}/info",
Version: apiVersionV1,
Handler: a.onMytestInfo,
},
}
}
/**
* switch Mytest component instance
*/
func (a *api) getMytestWithRequestValidation(reqCtx *fasthttp.RequestCtx) (mytest.Mytest, string, error) {if a.mytests == nil || len(a.mytests) == 0 {msg := NewErrorResponse("ERR_MYTEST_NOT_CONFIGURED", messages.ErrMytestNotFound)
respond(reqCtx, withError(fasthttp.StatusInternalServerError, msg))
log.Debug(msg)
return nil, "", errors.New(msg.Message)
}
mytestName := a.getMytestName(reqCtx)
if a.mytests[mytestName] == nil {msg := NewErrorResponse("ERR_MYTEST_NOT_CONFIGURED", fmt.Sprintf(messages.ErrMytestNotFound, mytestName))
respond(reqCtx, withError(fasthttp.StatusBadRequest, msg))
log.Debug(msg)
return nil, "", errors.New(msg.Message)
}
return a.mytests[mytestName], mytestName, nil
}
func (a *api) getMytestName(reqCtx *fasthttp.RequestCtx) string {return reqCtx.UserValue(mytestParam).(string)
}
func (a *api) onMytestInfo(reqCtx *fasthttp.RequestCtx) {log.Debug("calling mytest components")
mytestInstance, _, err := a.getMytestWithRequestValidation(reqCtx)
if err != nil {log.Debug(err)
return
}
mytestInstance.Info()
respond(reqCtx, withEmpty())
}
Dapr runtime 初始化入口
github.com/dapr/dapr/cmd/daprd/main.go
import (
//mytest demo
"github.com/dapr/components-contrib/mytest"
mytest_demof "github.com/dapr/components-contrib/mytest/demof"
mytest_demos "github.com/dapr/components-contrib/mytest/demos"
mytest_loader "github.com/dapr/dapr/pkg/components/mytest"
)
...
runtime.WithSecretStores(
...
runtime.WithMytests(mytest_loader.New("demof", func() mytest.Mytest {return mytest_demof.NewDemof(logContrib)
}),
mytest_loader.New("demos", func() mytest.Mytest {return mytest_demos.NewDemos(logContrib)
}),
),
...
)
....
在 component 目录中增加 mytest.yaml 配置文件
默认是在 .dapr/components
目录
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: demof
spec:
type: mytest.demof
version: v1
metadata:
- name: mName
value: mValue
编译并运行 Dapr
go mod edit -replace github.com/dapr/components-contrib=../components-contrib
go mod tidy
make DEBUG=1 build
# Back up the current daprd
cp ~/.dapr/bin/daprd ~/.dapr/bin/daprd.bak
cp ./dist/darwin_amd64/debug/daprd ~/.dapr/bin
dapr run --app-id myapp --dapr-http-port 3500 --log-level debug
察看 Dapr 启动日志
DEBU[0000] loading component. name: demof, type: mytest.demof/v1 app_id=myapp instance=MacBook-Pro-3.local scope=dapr.runtime type=log ver=edge
INFO[0000] component loaded. name: demof, type: mytest.demof/v1 app_id=myapp instance=MacBook-Pro-3.local scope=dapr.runtime type=log ver=edge
INFO[0000] {map[mName:mValue]} app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge
demof 组件曾经加载胜利,接下来应用 http api 接口尝试调用 demof info 接口
curl http://localhost:3500/v1.0/mytest/demof/info
日志记录到了咱们在组件中打印的信息
INFO[0126] this is Demof, I'm working app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge
批改一下 mytest.yaml 配置文件,实例化 demos
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: demos
spec:
type: mytest.demos
version: v1
metadata:
- name: mName
value: mValue
尝试调用 demos info 接口
curl http://localhost:3500/v1.0/mytest/demos/info
INFO[0058] this is Demos, I'm working app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge
组件失常工作。
以上示例代码曾经放在 github 上
https://github.com/RcXu/dapr.git
https://github.com/RcXu/components-contrib.git
分支名为:dev-demo
多谢浏览,敬请斧正!