序
本文次要钻研一下 storagetapper 的 pipe
Pipe
storagetapper/pipe/pipe.go
type Pipe interface {NewConsumer(topic string) (Consumer, error)
NewProducer(topic string) (Producer, error)
Type() string
Config() *config.PipeConfig
Close() error}
Pipe 接口定义了 NewConsumer、NewProducer、Type、Config、Close 办法
Consumer
storagetapper/pipe/pipe.go
type Consumer interface {Close() error
//CloseOnFailure doesn't save offsets
CloseOnFailure() error
Message() chan interface{}
Error() chan error
FetchNext() (interface{}, error)
//Allows to explicitly persists current consumer position
SaveOffset() error
//SetFormat allow to tell consumer the format of the file when there is no
//header
SetFormat(format string)
}
Consumer 接口定义了 Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat 办法
Producer
storagetapper/pipe/pipe.go
type Producer interface {Push(data interface{}) error
PushK(key string, data interface{}) error
PushSchema(key string, data []byte) error
//PushBatch queues the messages instead of sending immediately
PushBatch(key string, data interface{}) error
//PushCommit writes out all the messages queued by PushBatch
PushBatchCommit() error
Close() error
CloseOnFailure() error
SetFormat(format string)
PartitionKey(source string, key string) string
}
Producer 接口定义了 Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey
Create
storagetapper/pipe/pipe.go
func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {init := Pipes[strings.ToLower(pipeType)]
if init == nil {return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
}
pipe, err := init(cfg, db)
if err != nil {return nil, err}
return pipe, nil
}
type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)
//Pipes is the list of registered pipes
//Plugins insert their constructors into this map
var Pipes map[string]constructor
//registerPlugin should be called from plugin's init
func registerPlugin(name string, init constructor) {
if Pipes == nil {Pipes = make(map[string]constructor)
}
Pipes[name] = init
}
Create 办法依据 pipeType、PipeConfig、db 来创立 pipe
小结
storagetapper 的 Pipe 接口定义了 NewConsumer、NewProducer、Type、Config、Close 办法,其 Create 办法依据 pipeType、PipeConfig、db 来创立 pipe。
doc
- storagetapper