序
本文主要研究一下golang的zap的Sink
Sink
zap@v1.16.0/sink.go
type Sink interface { zapcore.WriteSyncer io.Closer } type WriteSyncer interface { io.Writer Sync() error } type Writer interface { Write(p []byte) (n int, err error) } type Closer interface { Close() error }
Sink接口内嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)接口
RegisterSink
zap@v1.16.0/sink.go
const schemeFile = "file" var ( _sinkMutex sync.RWMutex _sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme ) func init() { resetSinkRegistry() } func resetSinkRegistry() { _sinkMutex.Lock() defer _sinkMutex.Unlock() _sinkFactories = map[string]func(*url.URL) (Sink, error){ schemeFile: newFileSink, } } func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error { _sinkMutex.Lock() defer _sinkMutex.Unlock() if scheme == "" { return errors.New("can't register a sink factory for empty string") } normalized, err := normalizeScheme(scheme) if err != nil { return fmt.Errorf("%q is not a valid scheme: %v", scheme, err) } if _, ok := _sinkFactories[normalized]; ok { return fmt.Errorf("sink factory already registered for scheme %q", normalized) } _sinkFactories[normalized] = factory return nil }
RegisterSink方法会往_sinkFactories注册指定scheme的sink factory,该factory接收url.URL返回Sink;resetSinkRegistry方法默认注册了scheme为file的newFileSink
newFileSink
zap@v1.16.0/sink.go
func newFileSink(u *url.URL) (Sink, error) { if u.User != nil { return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u) } if u.Fragment != "" { return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u) } if u.RawQuery != "" { return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u) } // Error messages are better if we check hostname and port separately. if u.Port() != "" { return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u) } if hn := u.Hostname(); hn != "" && hn != "localhost" { return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u) } switch u.Path { case "stdout": return nopCloserSink{os.Stdout}, nil case "stderr": return nopCloserSink{os.Stderr}, nil } return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) }
newFileSink使用os.OpenFile创建*os.File,由于*os.File拥有Write、Sync、Close方法,因而它实现了Sink接口
newSink
zap@v1.16.0/sink.go
func newSink(rawURL string) (Sink, error) { u, err := url.Parse(rawURL) if err != nil { return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err) } if u.Scheme == "" { u.Scheme = schemeFile } _sinkMutex.RLock() factory, ok := _sinkFactories[u.Scheme] _sinkMutex.RUnlock() if !ok { return nil, &errSinkNotFound{u.Scheme} } return factory(u) }
newSink方法会根据rawURL解析对应的scheme,如果scheme为空则默认为file,然后从_sinkFactories找到对应的factory,创建sink返回
open
zap@v1.16.0/writer.go
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) { writers, close, err := open(paths) if err != nil { return nil, nil, err } writer := CombineWriteSyncers(writers...) return writer, close, nil } func open(paths []string) ([]zapcore.WriteSyncer, func(), error) { writers := make([]zapcore.WriteSyncer, 0, len(paths)) closers := make([]io.Closer, 0, len(paths)) close := func() { for _, c := range closers { c.Close() } } var openErr error for _, path := range paths { sink, err := newSink(path) if err != nil { openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err)) continue } writers = append(writers, sink) closers = append(closers, sink) } if openErr != nil { close() return writers, nil, openErr } return writers, close, nil }
zap.Open方法会使用newSink来创建sink作为zapcore.WriteSyncer
实例
func registerSinkDemo() { zap.RegisterSink("mq", mq.NewMqSink) writer, close, err := zap.Open("mq://192.168.99.100:9876/log") if err != nil { panic(err) } defer close() logger := zap.New(zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), writer, zap.DebugLevel)).Sugar() logger.Info("hello") } type MqWriteSyncer struct { topic string producer rocketmq.Producer ctx context.Context } func (m *MqWriteSyncer) Close() error { return m.producer.Shutdown() } func (m *MqWriteSyncer) Write(p []byte) (n int, err error) { msg := &primitive.Message{ Topic: m.topic, Body: p, } err = m.producer.SendOneWay(m.ctx, msg) return len(p), err } func (m *MqWriteSyncer) Sync() error { return nil } func NewMqSink(url *url.URL) (zap.Sink, error) { broker := fmt.Sprintf("%s:%s", url.Hostname(), url.Port()) topic := url.Path[1:len(url.Path)] p, _ := rocketmq.NewProducer( producer.WithNameServer([]string{broker}), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) return nil, err } return &MqWriteSyncer{producer: p, ctx: context.Background(), topic: topic}, nil }
这里通过zap.RegisterSink来注册一个mq的sink factory,然后通过zap.Open来创建MqWriteSyncer;MqWriteSyncer实现了zapcore.WriteSyncer的Write、Sync方法,同时也实现了Sink的Close方法
小结
Sink接口内嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)接口;zap.RegisterSink用于注册指定scheme的sink factory,而zap.Open则会解析url来找到对应的sink factory创建对应的sink,即writer。
doc
- zap
有疑问加站长微信联系(非本文作者)