继续进入下一个初始化
n.netService, err = nebnet.NewNebService(n)
if err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to setup net service.")
}
netservice有两个成员
type NebServicestruct {
node *Node
dispatcher *Dispatcher
}
跳出stup()函数
先进入start()函数看一看
if err := n.netService.Start(); err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Fatal("Failed to start net service.")
}
进入netservice.start()
func (ns *NebService) Start() error {
logging.CLog().Info("Starting NebService...")
// start dispatcher.
ns.dispatcher.Start()
// start node.
if err := ns.node.Start(); err != nil {
ns.dispatcher.Stop()
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Error("Failed to start NebService.")
return err
}
logging.CLog().Info("Started NebService.")
return nil
}
可以看到第一个start()的函数是dispatcher.start()
进入dispatch.start()
func (dp *Dispatcher) Start() {
logging.CLog().Info("Starting NebService Dispatcher...")
go dp.loop()
}
然后就出现一个新的线程、goruntime
go dp.loop()
进入该线程,看它干了些什么
timerChan := time.NewTicker(time.Second).C
for {
select {
case <-timerChan:
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))
case <-dp.quitCh:
logging.CLog().Info("Stoped NebService Dispatcher.")
return
case msg := <-dp.receivedMessageCh:
msgType := msg.MessageType()
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
}
m, _ := v.(*sync.Map)
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan <- msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispatch message.")
}
return true
})
}
}
一个有点长的循环
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))一秒钟刷新一次缓冲区
case msg := <-dp.receivedMessageCh:
msgType := msg.MessageType()如果能取出dp.receivedMessageCh
msgType := msg.MessageType()首先判断取出的信息类型
v, _ := dp.subscribersMap.Load(msgType)
if v == nil {
continue
}
根据类型取出相应的map
如果取不出,那么使用continue结束这个case
m, _ := v.(*sync.Map)
断言
m.Range(func(key, valueinterface{}) bool {
select {
case key.(*Subscriber).msgChan <- msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispa+tch message.")
}
return true
})
将msg推入其他管道里面去。其他goruntime会循环等待该
有疑问加站长微信联系(非本文作者)