go micro broker

舞林 · · 80 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

micro.newService()中newOptions

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {
        o(&opt)
    }

    return opt
}

初始化了一堆基础设置,先来看看Broker broker.DefaultBroker,
在broker/broker.go中 DefaultBroker Broker = NewBroker()

// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
    return newHttpBroker(opts...)
}

func newHttpBroker(opts ...Option) Broker {
    options := Options{
        Codec:    json.Marshaler{},
        Context:  context.TODO(),
        Registry: registry.DefaultRegistry,
    }

    for _, o := range opts {
        o(&options)
    }

    // set address
    addr := DefaultAddress

    if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
        addr = options.Addrs[0]
    }

    h := &httpBroker{
        id:          uuid.New().String(),
        address:     addr,
        opts:        options,
        r:           options.Registry,
        c:           &http.Client{Transport: newTransport(options.TLSConfig)},
        subscribers: make(map[string][]*httpSubscriber),
        exit:        make(chan chan error),
        mux:         http.NewServeMux(),
        inbox:       make(map[string][][]byte),
    }

    // specify the message handler
    h.mux.Handle(DefaultPath, h)

    // get optional handlers
    if h.opts.Context != nil {
        handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
        if ok {
            for pattern, handler := range handlers {
                h.mux.Handle(pattern, handler)
            }
        }
    }

    return h
}

这里做了几件事

  1. 初始化options,设置Codec为json,设置ctx,Registry
  2. 初始化httpBroker,设置http.Client时调用newTransport()设置代理,同时启用http2,最后指定message handler

h.mux.Handle(DefaultPath, h)h就是httpBroker,在httpBroker中实现了ServeHTTP(),则所有请求都通过他来处理,即所有订阅的消息处理都是通过httpBroker.ServeHTTP()来处理的

  1. 如果ctx不为空,就取http_handlers数组,依次注册http handle

下面看看example/broker, 一个broker的示例

var (
    topic = "go.micro.topic.foo"
)

func pub() {
    tick := time.NewTicker(time.Second)
    i := 0
    for _ = range tick.C {
        msg := &broker.Message{
            Header: map[string]string{
                "id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {
            log.Printf("[pub] failed: %v", err)
        } else {
            fmt.Println("[pub] pubbed message:", string(msg.Body))
        }
        i++
    }
}

func sub() {
    _, err := broker.Subscribe(topic, func(p broker.Event) error {
        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })
    if err != nil {
        fmt.Println(err)
    }
}

func main() {
    cmd.Init()

    if err := broker.Init(); err != nil {
        log.Fatalf("Broker Init error: %v", err)
    }
    if err := broker.Connect(); err != nil {
        log.Fatalf("Broker Connect error: %v", err)
    }

    go pub()
    go sub()

    <-time.After(time.Second * 10)
}

cmd.init()请见micro cmd篇,有详细介绍
cmd.opts.Broker默认使用的是上文分析的httpBroker
先看broker.Init()

func (h *httpBroker) Init(opts ...Option) error {
    h.RLock()
    if h.running {
        h.RUnlock()
        return errors.New("cannot init while connected")
    }
    h.RUnlock()

    h.Lock()
    defer h.Unlock()

    for _, o := range opts {
        o(&h.opts)
    }

    if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
        h.address = h.opts.Addrs[0]
    }

    if len(h.id) == 0 {
        h.id = "go.micro.http.broker-" + uuid.New().String()
    }

    // get registry
    reg := h.opts.Registry
    if reg == nil {
        reg = registry.DefaultRegistry
    }

    // get cache
    if rc, ok := h.r.(cache.Cache); ok {
        rc.Stop()
    }

    // set registry
    h.r = cache.New(reg)

    // reconfigure tls config
    if c := h.opts.TLSConfig; c != nil {
        h.c = &http.Client{
            Transport: newTransport(c),
        }
    }

    return nil
}

这里做了以下几件事情

  1. 上读锁,检查是否正在运行
  2. 上读写锁,在进行后面操作

    1. 设置opt
    2. 设置address,id
    3. 获取Registry,cache,设置registry
    4. 设置http.Client中Transport的tls

再看broker.Connect()

func (h *httpBroker) Connect() error {
    h.RLock()
    if h.running {
        h.RUnlock()
        return nil
    }
    h.RUnlock()

    h.Lock()
    defer h.Unlock()

    var l net.Listener
    var err error

    if h.opts.Secure || h.opts.TLSConfig != nil {
        config := h.opts.TLSConfig

        fn := func(addr string) (net.Listener, error) {
            if config == nil {
                hosts := []string{addr}

                // check if its a valid host:port
                if host, _, err := net.SplitHostPort(addr); err == nil {
                    if len(host) == 0 {
                        hosts = maddr.IPs()
                    } else {
                        hosts = []string{host}
                    }
                }

                // generate a certificate
                cert, err := mls.Certificate(hosts...)
                if err != nil {
                    return nil, err
                }
                config = &tls.Config{Certificates: []tls.Certificate{cert}}
            }
            return tls.Listen("tcp", addr, config)
        }

        l, err = mnet.Listen(h.address, fn)
    } else {
        fn := func(addr string) (net.Listener, error) {
            return net.Listen("tcp", addr)
        }

        l, err = mnet.Listen(h.address, fn)
    }

    if err != nil {
        return err
    }

    addr := h.address
    h.address = l.Addr().String()

    go http.Serve(l, h.mux)
    go func() {
        h.run(l)
        h.Lock()
        h.opts.Addrs = []string{addr}
        h.address = addr
        h.Unlock()
    }()

    // get registry
    reg := h.opts.Registry
    if reg == nil {
        reg = registry.DefaultRegistry
    }
    // set cache
    h.r = cache.New(reg)

    // set running
    h.running = true
    return nil
}

这里做了以下几件事情

  1. 上读锁,检查是否正在运行
  2. 上读写锁,在进行后面操作

    1. 如果有Secure和TLSConfig,做一些tls的设置,没有则直接返回默认net.Listener
    2. 开一个协程运行http.Serve,处理请求是newHttpBroker()中指定的handle函数ServeHTTP()( 标准库http提供了Handler接口,用于开发者实现自己的handler。只要实现接口的ServeHTTP方法即可。)
    3. 开一个协程运行run(),后面看,设置地址
    4. 获取Registry,设置cache
    5. 标记httpBroker正在运行

看看刚才的run()做了什么

func (h *httpBroker) run(l net.Listener) {
    t := time.NewTicker(registerInterval)
    defer t.Stop()

    for {
        select {
        // heartbeat for each subscriber
        case <-t.C:
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
                }
            }
            h.RUnlock()
        // received exit signal
        case ch := <-h.exit:
            ch <- l.Close()
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    _ = h.r.Deregister(sub.svc)
                }
            }
            h.RUnlock()
            return
        }
    }
}

这里做了以下几件事情

  1. 设置默认30秒一次的时间触发器,每个周期都在服务发现中心依次注册subscribers中的订阅
  2. 接受退出消息,依次调用Deregister()注销subscribers中的订阅

接下来看看怎么订阅消息broker.Subscribe()

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
    var err error
    var host, port string
    options := NewSubscribeOptions(opts...)

    // parse address for host, port
    host, port, err = net.SplitHostPort(h.Address())
    if err != nil {
        return nil, err
    }

    addr, err := maddr.Extract(host)
    if err != nil {
        return nil, err
    }

    var secure bool

    if h.opts.Secure || h.opts.TLSConfig != nil {
        secure = true
    }

    // register service
    node := &registry.Node{
        Id:      topic + "-" + h.id,
        Address: mnet.HostPort(addr, port),
        Metadata: map[string]string{
            "secure": fmt.Sprintf("%t", secure),
            "broker": "http",
            "topic":  topic,
        },
    }

    // check for queue group or broadcast queue
    version := options.Queue
    if len(version) == 0 {
        version = broadcastVersion
    }

    service := &registry.Service{
        Name:    serviceName,
        Version: version,
        Nodes:   []*registry.Node{node},
    }

    // generate subscriber
    subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    node.Id,
        topic: topic,
        fn:    handler,
        svc:   service,
    }

    // subscribe now
    if err := h.subscribe(subscriber); err != nil {
        return nil, err
    }

    // return the subscriber
    return subscriber, nil
}

func (h *httpBroker) subscribe(s *httpSubscriber) error {
    h.Lock()
    defer h.Unlock()

    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
        return err
    }

    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
    return nil
}

这里做了以下几件事情

  1. 初始化SubscribeOptions
  2. 解析Address中host, port,并验证ip
  3. 初始化registry.Node{}
  4. 检查options.Queue,设置registry.Service{},
  5. 生成订阅信息结构体httpSubscriber{}
  6. 调用subscribe(),返回subscriber

    1. 开读写锁,把订阅服务(Connect()中开了http.Serve())注册到服务发现(默认mdns),发消息的时候通过服务发现找node,就是往这些注册的服务中发了
    2. 订阅频道记录到h.subscribers中

获取消息通过调用Subscribe时传递的处理函数,示例如下

broker.Subscribe(topic, func(p broker.Event) error {
        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })

Event及httpEvent定义

// Event is given to a subscription handler for processing
type Event interface {
    Topic() string
    Message() *Message
    Ack() error
    Error() error
}

type httpEvent struct {
    m   *Message
    t   string
    err error
}

func (h *httpEvent) Ack() error {
    return nil
}

func (h *httpEvent) Error() error {
    return h.err
}

func (h *httpEvent) Message() *Message {
    return h.m
}

func (h *httpEvent) Topic() string {
    return h.t
}

可以看到httpEvent实现了Event,这样p.Message()就可以得到消息了
获取消息在ServeHTTP()中,收到消息,调用传入的fn处理即可

下面再看发布消息,只需要定义broker.Message,再调用broker.Publish()即可,示例如下

msg := &broker.Message{
            Header: map[string]string{
                "id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {
            log.Printf("[pub] failed: %v", err)
        } else {
            fmt.Println("[pub] pubbed message:", string(msg.Body))
        }

看看broker.Publish()

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
    // create the message first
    m := &Message{
        Header: make(map[string]string),
        Body:   msg.Body,
    }

    for k, v := range msg.Header {
        m.Header[k] = v
    }

    m.Header["Micro-Topic"] = topic

    // encode the message
    b, err := h.opts.Codec.Marshal(m)
    if err != nil {
        return err
    }

    // save the message
    h.saveMessage(topic, b)

    // now attempt to get the service
    h.RLock()
    s, err := h.r.GetService(serviceName)
    if err != nil {
        h.RUnlock()
        return err
    }
    h.RUnlock()

    pub := func(node *registry.Node, t string, b []byte) error {
        scheme := "http"

        // check if secure is added in metadata
        if node.Metadata["secure"] == "true" {
            scheme = "https"
        }

        vals := url.Values{}
        vals.Add("id", node.Id)

        uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode())
        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
        if err != nil {
            return err
        }

        // discard response body
        io.Copy(ioutil.Discard, r.Body)
        r.Body.Close()
        return nil
    }

    srv := func(s []*registry.Service, b []byte) {
        for _, service := range s {
            var nodes []*registry.Node

            for _, node := range service.Nodes {
                // only use nodes tagged with broker http
                if node.Metadata["broker"] != "http" {
                    continue
                }

                // look for nodes for the topic
                if node.Metadata["topic"] != topic {
                    continue
                }

                nodes = append(nodes, node)
            }

            // only process if we have nodes
            if len(nodes) == 0 {
                continue
            }

            switch service.Version {
            // broadcast version means broadcast to all nodes
            case broadcastVersion:
                var success bool

                // publish to all nodes
                for _, node := range nodes {
                    // publish async
                    if err := pub(node, topic, b); err == nil {
                        success = true
                    }
                }

                // save if it failed to publish at least once
                if !success {
                    h.saveMessage(topic, b)
                }
            default:
                // select node to publish to
                node := nodes[rand.Int()%len(nodes)]

                // publish async to one node
                if err := pub(node, topic, b); err != nil {
                    // if failed save it
                    h.saveMessage(topic, b)
                }
            }
        }
    }

    // do the rest async
    go func() {
        // get a third of the backlog
        messages := h.getMessage(topic, 8)
        delay := (len(messages) > 1)

        // publish all the messages
        for _, msg := range messages {
            // serialize here
            srv(s, msg)

            // sending a backlog of messages
            if delay {
                time.Sleep(time.Millisecond * 100)
            }
        }
    }()

    return nil
}

这里做了以下几件事情

  1. 创建消息,header中添加一个Micro-Topic,值是topic
  2. 编码消息,默认json
  3. 调用saveMessage()保存消息

    1. httpBroker.mtx加锁
    2. 在httpBroker.inbox中添加这条消息,如果inbox
数组大于64条则只取前64条重赋值给inbox,【`震惊,后面直接丢了!!!`】
  1. 调用httpBroker.r.GetService(serviceName)获取service,serviceName默认"micro.http.broker"
  2. 创建处理函数pub()

    1. 确定scheme[http或https]
    2. url参数中增加id,值为node.id
    3. 拼接uri,scheme://(node.Address)(DefaultPath)(vals.Encode())
    4. post发起请求,返回结果放入ioutil.Discard(就是丢掉了),关闭返回body
  3. 创建处理函数srv()

    1. 收集所有可用的registry.Node
    2. 判断消息广播还是指定了Queue(指定了Queue随机选一个node),根据情况异步调用pub()发送,失败则重新调用saveMessage()
  4. 创建协程

    1. 调用getMessage()

      1. 开读写锁h.mtx.Lock()
      2. 从inbox取出指定条数的消息
    2. 依次用第6步的srv()处理每条消息,如果刚才取出的消息大于1条,每次发送间隔Millisecond*100

至此,整个broker的流程比较清晰了。

总结:

  1. 默认的http broker,订阅就是开了一个http服务手消息,发布就是从服务发现拿到节点信息,往节点post数据。
  2. 实际使用中通常可以指定etcd,consul这样的服务发现。
  3. 消息inbox最多只放64条,尽量避免消息堆积,消息最好写日志

写到这里又要推荐大家看micro in action了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:舞林

查看原文:go micro broker

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:1006366459

80 次点击  
加入收藏 微博
上一篇:go micro client
下一篇:go micro cmd
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传