golang p2p网

· · 1963 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

继续进入下一个初始化

n.netService, err = nebnet.NewNebService(n)

if err != nil {

logging.CLog().WithFields(logrus.Fields{

"err": err,

}).Fatal("Failed to setup net service.")

}

netservice有两个成员

type NebServicestruct {

node      *Node

dispatcher *Dispatcher

}

跳出stup()函数

先进入start()函数看一看

if err := n.netService.Start(); err != nil {

logging.CLog().WithFields(logrus.Fields{

"err": err,

}).Fatal("Failed to start net service.")

}

进入netservice.start()

func (ns *NebService) Start() error {

logging.CLog().Info("Starting NebService...")

// start dispatcher.

  ns.dispatcher.Start()

// start node.

  if err := ns.node.Start(); err != nil {

ns.dispatcher.Stop()

logging.CLog().WithFields(logrus.Fields{

"err": err,

}).Error("Failed to start NebService.")

return err

}

logging.CLog().Info("Started NebService.")

return nil

}

可以看到第一个start()的函数是dispatcher.start()

进入dispatch.start()

func (dp *Dispatcher) Start() {

logging.CLog().Info("Starting NebService Dispatcher...")

go dp.loop()

}

然后就出现一个新的线程、goruntime

go dp.loop()

进入该线程,看它干了些什么

timerChan := time.NewTicker(time.Second).C

for {

select {

case <-timerChan:

metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))

case <-dp.quitCh:

logging.CLog().Info("Stoped NebService Dispatcher.")

return

  case msg := <-dp.receivedMessageCh:

msgType := msg.MessageType()

v, _ := dp.subscribersMap.Load(msgType)

if v == nil {

continue

      }

m, _ := v.(*sync.Map)

m.Range(func(key, valueinterface{}) bool {

select {

case key.(*Subscriber).msgChan <- msg:

default:

logging.VLog().WithFields(logrus.Fields{

"msgType": msgType,

}).Warn("timeout to dispatch message.")

}

return true

      })

}

}

一个有点长的循环

metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))一秒钟刷新一次缓冲区

case msg := <-dp.receivedMessageCh:

msgType := msg.MessageType()如果能取出dp.receivedMessageCh

msgType := msg.MessageType()首先判断取出的信息类型

v, _ := dp.subscribersMap.Load(msgType)

if v == nil {

continue

}

根据类型取出相应的map

如果取不出,那么使用continue结束这个case

m, _ := v.(*sync.Map)

断言

m.Range(func(key, valueinterface{}) bool {

select {

case key.(*Subscriber).msgChan <- msg:

default:

logging.VLog().WithFields(logrus.Fields{

"msgType": msgType,

}).Warn("timeout to dispa+tch message.")

}

return true

})

将msg推入其他管道里面去。其他goruntime会循环等待该


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:

查看原文:golang p2p网

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1963 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传