发布于 3年前
storagetapper的pipe介绍
序
本文主要研究一下storagetapper的pipe
Pipe
storagetapper/pipe/pipe.go
type Pipe interface {
NewConsumer(topic string) (Consumer, error)
NewProducer(topic string) (Producer, error)
Type() string
Config() *config.PipeConfig
Close() error
}
Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法
Consumer
storagetapper/pipe/pipe.go
type Consumer interface {
Close() error
//CloseOnFailure doesn't save offsets
CloseOnFailure() error
Message() chan interface{}
Error() chan error
FetchNext() (interface{}, error)
//Allows to explicitly persists current consumer position
SaveOffset() error
//SetFormat allow to tell consumer the format of the file when there is no
//header
SetFormat(format string)
}
Consumer接口定义了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat方法
Producer
storagetapper/pipe/pipe.go
type Producer interface {
Push(data interface{}) error
PushK(key string, data interface{}) error
PushSchema(key string, data []byte) error
//PushBatch queues the messages instead of sending immediately
PushBatch(key string, data interface{}) error
//PushCommit writes out all the messages queued by PushBatch
PushBatchCommit() error
Close() error
CloseOnFailure() error
SetFormat(format string)
PartitionKey(source string, key string) string
}
Producer接口定义了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey
Create
storagetapper/pipe/pipe.go
func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
init := Pipes[strings.ToLower(pipeType)]
if init == nil {
return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
}
pipe, err := init(cfg, db)
if err != nil {
return nil, err
}
return pipe, nil
}
type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)
//Pipes is the list of registered pipes
//Plugins insert their constructors into this map
var Pipes map[string]constructor
//registerPlugin should be called from plugin's init
func registerPlugin(name string, init constructor) {
if Pipes == nil {
Pipes = make(map[string]constructor)
}
Pipes[name] = init
}
Create方法根据pipeType、PipeConfig、db来创建pipe
小结
storagetapper的Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法,其Create方法根据pipeType、PipeConfig、db来创建pipe。