本文次要钻研一下storagetapper的pipe

Pipe

storagetapper/pipe/pipe.go

type Pipe interface {    NewConsumer(topic string) (Consumer, error)    NewProducer(topic string) (Producer, error)    Type() string    Config() *config.PipeConfig    Close() error}
Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close办法

Consumer

storagetapper/pipe/pipe.go

type Consumer interface {    Close() error    //CloseOnFailure doesn't save offsets    CloseOnFailure() error    Message() chan interface{}    Error() chan error    FetchNext() (interface{}, error)    //Allows to explicitly persists current consumer position    SaveOffset() error    //SetFormat allow to tell consumer the format of the file when there is no    //header    SetFormat(format string)}
Consumer接口定义了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat办法

Producer

storagetapper/pipe/pipe.go

type Producer interface {    Push(data interface{}) error    PushK(key string, data interface{}) error    PushSchema(key string, data []byte) error    //PushBatch queues the messages instead of sending immediately    PushBatch(key string, data interface{}) error    //PushCommit writes out all the messages queued by PushBatch    PushBatchCommit() error    Close() error    CloseOnFailure() error    SetFormat(format string)    PartitionKey(source string, key string) string}
Producer接口定义了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey

Create

storagetapper/pipe/pipe.go

func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {    init := Pipes[strings.ToLower(pipeType)]    if init == nil {        return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))    }    pipe, err := init(cfg, db)    if err != nil {        return nil, err    }    return pipe, nil}type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)//Pipes is the list of registered pipes//Plugins insert their constructors into this mapvar Pipes map[string]constructor//registerPlugin should be called from plugin's initfunc registerPlugin(name string, init constructor) {    if Pipes == nil {        Pipes = make(map[string]constructor)    }    Pipes[name] = init}
Create办法依据pipeType、PipeConfig、db来创立pipe

小结

storagetapper的Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close办法,其Create办法依据pipeType、PipeConfig、db来创立pipe。

doc

  • storagetapper