共计 4439 个字符,预计需要花费 12 分钟才能阅读完成。
序
本文次要钻研一下 golang 的 zap 的 Sink
Sink
zap@v1.16.0/sink.go
type Sink interface {
zapcore.WriteSyncer
io.Closer
}
type WriteSyncer interface {
io.Writer
Sync() error}
type Writer interface {Write(p []byte) (n int, err error)
}
type Closer interface {Close() error
}
Sink 接口内嵌了 zapcore.WriteSyncer(
Write、Sync
)、io.Closer(Close
)接口
RegisterSink
zap@v1.16.0/sink.go
const schemeFile = "file"
var (
_sinkMutex sync.RWMutex
_sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
)
func init() {resetSinkRegistry()
}
func resetSinkRegistry() {_sinkMutex.Lock()
defer _sinkMutex.Unlock()
_sinkFactories = map[string]func(*url.URL) (Sink, error){schemeFile: newFileSink,}
}
func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {_sinkMutex.Lock()
defer _sinkMutex.Unlock()
if scheme == "" {return errors.New("can't register a sink factory for empty string")
}
normalized, err := normalizeScheme(scheme)
if err != nil {return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
}
if _, ok := _sinkFactories[normalized]; ok {return fmt.Errorf("sink factory already registered for scheme %q", normalized)
}
_sinkFactories[normalized] = factory
return nil
}
RegisterSink 办法会往_sinkFactories 注册指定 scheme 的 sink factory,该 factory 接管 url.URL 返回 Sink;resetSinkRegistry 办法默认注册了 scheme 为 file 的 newFileSink
newFileSink
zap@v1.16.0/sink.go
func newFileSink(u *url.URL) (Sink, error) {
if u.User != nil {return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u)
}
if u.Fragment != "" {return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u)
}
if u.RawQuery != "" {return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u)
}
// Error messages are better if we check hostname and port separately.
if u.Port() != "" {return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u)
}
if hn := u.Hostname(); hn != ""&& hn !="localhost" {return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u)
}
switch u.Path {
case "stdout":
return nopCloserSink{os.Stdout}, nil
case "stderr":
return nopCloserSink{os.Stderr}, nil
}
return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
}
newFileSink 应用 os.OpenFile 创立
*os.File
,因为*os.File
领有 Write、Sync、Close 办法,因此它实现了 Sink 接口
newSink
zap@v1.16.0/sink.go
func newSink(rawURL string) (Sink, error) {u, err := url.Parse(rawURL)
if err != nil {return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
}
if u.Scheme == "" {u.Scheme = schemeFile}
_sinkMutex.RLock()
factory, ok := _sinkFactories[u.Scheme]
_sinkMutex.RUnlock()
if !ok {return nil, &errSinkNotFound{u.Scheme}
}
return factory(u)
}
newSink 办法会依据 rawURL 解析对应的 scheme,如果 scheme 为空则默认为 file,而后从_sinkFactories 找到对应的 factory,创立 sink 返回
open
zap@v1.16.0/writer.go
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {writers, close, err := open(paths)
if err != nil {return nil, nil, err}
writer := CombineWriteSyncers(writers...)
return writer, close, nil
}
func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {writers := make([]zapcore.WriteSyncer, 0, len(paths))
closers := make([]io.Closer, 0, len(paths))
close := func() {
for _, c := range closers {c.Close()
}
}
var openErr error
for _, path := range paths {sink, err := newSink(path)
if err != nil {openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
continue
}
writers = append(writers, sink)
closers = append(closers, sink)
}
if openErr != nil {close()
return writers, nil, openErr
}
return writers, close, nil
}
zap.Open 办法会应用 newSink 来创立 sink 作为 zapcore.WriteSyncer
实例
func registerSinkDemo() {zap.RegisterSink("mq", mq.NewMqSink)
writer, close, err := zap.Open("mq://192.168.99.100:9876/log")
if err != nil {panic(err)
}
defer close()
logger := zap.New(zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), writer, zap.DebugLevel)).Sugar()
logger.Info("hello")
}
type MqWriteSyncer struct {
topic string
producer rocketmq.Producer
ctx context.Context
}
func (m *MqWriteSyncer) Close() error {return m.producer.Shutdown()
}
func (m *MqWriteSyncer) Write(p []byte) (n int, err error) {
msg := &primitive.Message{
Topic: m.topic,
Body: p,
}
err = m.producer.SendOneWay(m.ctx, msg)
return len(p), err
}
func (m *MqWriteSyncer) Sync() error {return nil}
func NewMqSink(url *url.URL) (zap.Sink, error) {broker := fmt.Sprintf("%s:%s", url.Hostname(), url.Port())
topic := url.Path[1:len(url.Path)]
p, _ := rocketmq.NewProducer(producer.WithNameServer([]string{broker}),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {fmt.Printf("start producer error: %s", err.Error())
return nil, err
}
return &MqWriteSyncer{producer: p, ctx: context.Background(), topic: topic}, nil
}
这里通过 zap.RegisterSink 来注册一个 mq 的 sink factory,而后通过 zap.Open 来创立 MqWriteSyncer;MqWriteSyncer 实现了 zapcore.WriteSyncer 的 Write、Sync 办法,同时也实现了 Sink 的 Close 办法
小结
Sink 接口内嵌了 zapcore.WriteSyncer(Write、Sync
)、io.Closer(Close
)接口;zap.RegisterSink 用于注册指定 scheme 的 sink factory,而 zap.Open 则会解析 url 来找到对应的 sink factory 创立对应的 sink,即 writer。
doc
- zap