micro.newService()中newOptions
func newOptions(opts ...Option) Options {
opt := Options{
Auth: auth.DefaultAuth,
Broker: broker.DefaultBroker,
Cmd: cmd.DefaultCmd,
Config: config.DefaultConfig,
Client: client.DefaultClient,
Server: server.DefaultServer,
Store: store.DefaultStore,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Runtime: runtime.DefaultRuntime,
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
}
for _, o := range opts {
o(&opt)
}
return opt
}
初始化了一堆基础设置,先来看看Broker broker.DefaultBroker,
在broker/broker.go中 DefaultBroker Broker = NewBroker()
// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
return newHttpBroker(opts...)
}
func newHttpBroker(opts ...Option) Broker {
options := Options{
Codec: json.Marshaler{},
Context: context.TODO(),
Registry: registry.DefaultRegistry,
}
for _, o := range opts {
o(&options)
}
// set address
addr := DefaultAddress
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
}
h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
// specify the message handler
h.mux.Handle(DefaultPath, h)
// get optional handlers
if h.opts.Context != nil {
handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {
h.mux.Handle(pattern, handler)
}
}
}
return h
}
这里做了几件事
- 初始化options,设置Codec为json,设置ctx,Registry
- 初始化httpBroker,设置http.Client时调用newTransport()设置代理,同时启用http2,最后指定message handler
h.mux.Handle(DefaultPath, h)
h就是httpBroker,在httpBroker中实现了ServeHTTP(),则所有请求都通过他来处理,即所有订阅的消息处理都是通过httpBroker.ServeHTTP()来处理的
- 如果ctx不为空,就取http_handlers数组,依次注册http handle
下面看看example/broker, 一个broker的示例
var (
topic = "go.micro.topic.foo"
)
func pub() {
tick := time.NewTicker(time.Second)
i := 0
for _ = range tick.C {
msg := &broker.Message{
Header: map[string]string{
"id": fmt.Sprintf("%d", i),
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
if err := broker.Publish(topic, msg); err != nil {
log.Printf("[pub] failed: %v", err)
} else {
fmt.Println("[pub] pubbed message:", string(msg.Body))
}
i++
}
}
func sub() {
_, err := broker.Subscribe(topic, func(p broker.Event) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
if err != nil {
fmt.Println(err)
}
}
func main() {
cmd.Init()
if err := broker.Init(); err != nil {
log.Fatalf("Broker Init error: %v", err)
}
if err := broker.Connect(); err != nil {
log.Fatalf("Broker Connect error: %v", err)
}
go pub()
go sub()
<-time.After(time.Second * 10)
}
cmd.init()请见micro cmd篇,有详细介绍
cmd.opts.Broker默认使用的是上文分析的httpBroker
先看broker.Init()
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "go.micro.http.broker-" + uuid.New().String()
}
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// get cache
if rc, ok := h.r.(cache.Cache); ok {
rc.Stop()
}
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
}
}
return nil
}
这里做了以下几件事情
- 上读锁,检查是否正在运行
上读写锁,在进行后面操作
- 设置opt
- 设置address,id
- 获取Registry,cache,设置registry
- 设置http.Client中Transport的tls
再看broker.Connect()
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()
}()
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
这里做了以下几件事情
- 上读锁,检查是否正在运行
上读写锁,在进行后面操作
- 如果有Secure和TLSConfig,做一些tls的设置,没有则直接返回默认net.Listener
- 开一个协程运行http.Serve,处理请求是
newHttpBroker()
中指定的handle函数ServeHTTP()
( 标准库http提供了Handler接口,用于开发者实现自己的handler。只要实现接口的ServeHTTP方法即可。) - 开一个协程运行run(),后面看,设置地址
- 获取Registry,设置cache
- 标记httpBroker正在运行
看看刚才的run()
做了什么
func (h *httpBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Deregister(sub.svc)
}
}
h.RUnlock()
return
}
}
}
这里做了以下几件事情
- 设置默认30秒一次的时间触发器,每个周期都在服务发现中心依次注册subscribers中的订阅
- 接受退出消息,依次调用
Deregister()
注销subscribers中的订阅
接下来看看怎么订阅消息broker.Subscribe()
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {
return nil, err
}
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := ®istry.Node{
Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := ®istry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
这里做了以下几件事情
- 初始化SubscribeOptions
- 解析Address中host, port,并验证ip
- 初始化registry.Node{}
- 检查options.Queue,设置registry.Service{},
- 生成订阅信息结构体httpSubscriber{}
调用subscribe(),返回subscriber
- 开读写锁,把订阅服务(Connect()中开了http.Serve())注册到服务发现(默认mdns),发消息的时候通过服务发现找node,就是往这些注册的服务中发了
- 订阅频道记录到h.subscribers中
获取消息通过调用Subscribe时传递的处理函数,示例如下
broker.Subscribe(topic, func(p broker.Event) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
Event及httpEvent定义
// Event is given to a subscription handler for processing
type Event interface {
Topic() string
Message() *Message
Ack() error
Error() error
}
type httpEvent struct {
m *Message
t string
err error
}
func (h *httpEvent) Ack() error {
return nil
}
func (h *httpEvent) Error() error {
return h.err
}
func (h *httpEvent) Message() *Message {
return h.m
}
func (h *httpEvent) Topic() string {
return h.t
}
可以看到httpEvent实现了Event,这样p.Message()就可以得到消息了
获取消息在ServeHTTP()中,收到消息,调用传入的fn处理即可
下面再看发布消息,只需要定义broker.Message
,再调用broker.Publish()
即可,示例如下
msg := &broker.Message{
Header: map[string]string{
"id": fmt.Sprintf("%d", i),
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
if err := broker.Publish(topic, msg); err != nil {
log.Printf("[pub] failed: %v", err)
} else {
fmt.Println("[pub] pubbed message:", string(msg.Body))
}
看看broker.Publish()
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header["Micro-Topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService(serviceName)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
// only process if we have nodes
if len(nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
}
这里做了以下几件事情
- 创建消息,header中添加一个
Micro-Topic
,值是topic - 编码消息,默认json
调用saveMessage()保存消息
- httpBroker.mtx加锁
- 在httpBroker.inbox中添加这条消息,如果inbox
数组大于64条则只取前64条重赋值给inbox,【`震惊,后面直接丢了!!!`】
- 调用
httpBroker.r.GetService(serviceName)
获取service,serviceName默认"micro.http.broker" 创建处理函数
pub()
- 确定scheme[http或https]
- url参数中增加id,值为node.id
- 拼接uri,scheme://(node.Address)(DefaultPath)(vals.Encode())
- post发起请求,返回结果放入ioutil.Discard(就是丢掉了),关闭返回body
创建处理函数
srv()
- 收集所有可用的registry.Node
- 判断消息广播还是指定了Queue(指定了Queue随机选一个node),根据情况异步调用
pub()
发送,失败则重新调用saveMessage()
创建协程
调用getMessage()
- 开读写锁h.mtx.Lock()
- 从inbox取出指定条数的消息
- 依次用第6步的
srv()
处理每条消息,如果刚才取出的消息大于1条,每次发送间隔Millisecond*100
至此,整个broker的流程比较清晰了。
总结:
- 默认的http broker,订阅就是开了一个http服务手消息,发布就是从服务发现拿到节点信息,往节点post数据。
- 实际使用中通常可以指定etcd,consul这样的服务发现。
- 消息inbox最多只放64条,尽量避免消息堆积,消息最好写日志
写到这里又要推荐大家看micro in action了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker
有疑问加站长微信联系(非本文作者)