beehive是kubeedge中的一个通信框架,整个kubeEdge的cloud和edge模块都依赖于beehive框架。
不弄清楚beehive的工作原理,就很难分析kubeEdge整体的逻辑。
目录结构
├── pkg
│ ├── common
│ │ ├── config
│ │ │ ├── config.go
│ │ │ └── config_test.go
│ │ └── util
│ │ ├── conn.go
│ │ ├── conn_test.go
│ │ ├── file_util.go
│ │ └── parse_resource.go
│ └── core
│ ├── context
│ │ ├── context.go
│ │ ├── context_channel.go
│ │ ├── context_channel_test.go
│ │ ├── context_factory.go
│ │ └── context_unixsocket.go
│ ├── core.go
│ ├── model
│ │ └── message.go
│ └── module.go
beehive的整体的代码量是比较少的,主要就是core中的几个文件
;初步看beehive实现了两种通信机制,一种是通过unixsocket,另一种是通过golang的channel。
使用
beehive并不是单独能运行的模块,而是直接被其他模块引用的。
core.Register(MODULE)
// start all modules
core.Run()
首先对需要使用beehive的模块进行注册,都注册完成以后,调用Run
init
init是加载的时候默认调用的,
func init() {
modules = make(map[string]Module)
disabledModules = make(map[string]Module)
config.AddConfigChangeCallback(moduleChangeCallback{})
eventListener := config.EventListener{Name: "eventListener1"}
config.CONFIG.RegisterListener(eventListener, "modules.enabled")
}
modules被保存到一个map中。disabledModules没啥用,就是可以配置禁用哪些模块。
其他和config相关的内容也没啥用,就是监控配置文件变化来控制模块的启用、停用的。
Register
register只是把模块以名称为key放到map中。
Run
func StartModules() {
beehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel)
modules := GetModules()
for name, module := range modules {
//Init the module
beehiveContext.AddModule(name)
//Assemble typeChannels for sendToGroup
beehiveContext.AddModuleGroup(name, module.Group())
go module.Start()
klog.Infof("Starting module %v", name)
}
}
run方法调用了StartModules方法,这里详细分析一下
InitContext
参数MsgCtxTypeChannel是硬编码在源码中的,配的是"channel"
func InitContext(contextType string) {
once.Do(func() {
ctx, cancel := gocontext.WithCancel(gocontext.Background())
context = &beehiveContext{
ctx: ctx,
cancel: cancel,
}
switch contextType {
case MsgCtxTypeChannel:
channelContext := NewChannelContext()
context.messageContext = channelContext
context.moduleContext = channelContext
default:
klog.Fatalf("Do not support context type:%s", contextType)
}
})
}
创建了一个空的cancel context,并把context的引用和cancel方法保存到一个全局的beehiveContext变量中,定义如下:
type beehiveContext struct {
moduleContext ModuleContext
messageContext MessageContext
ctx gocontext.Context
cancel gocontext.CancelFunc
}
messageContext和moduleContext指向的是同一个用NewChannelContext创建的channelContext。
func NewChannelContext() *ChannelContext {
channelMap := make(map[string]chan model.Message)
moduleChannels := make(map[string]map[string]chan model.Message)
anonChannels := make(map[string]chan model.Message)
return &ChannelContext{
channels: channelMap,
typeChannels: moduleChannels,
anonChannels: anonChannels,
}
}
可以看到,channelContext实际上就包含3个空的map,map的key是一个string类型(从后面的代码看,是一个uuid),
channelMap和anonChannels比较简单,value就是一个类型为model.Message的channel;
moduleChannels比较复杂,value又是一个map,这个map的key是string,value是类型为model.Message的channel。
---
回到run中,接下来就是遍历每个之前注册的module
beehiveContext.AddModule
// AddModule adds module into module context
func AddModule(module string) {
context.moduleContext.AddModule(module)
}
//AddModule adds module into module context
func (ctx *ChannelContext) AddModule(module string) {
channel := ctx.newChannel()
ctx.addChannel(module, channel)
}
// New Channel
func (ctx *ChannelContext) newChannel() chan model.Message {
channel := make(chan model.Message, ChannelSizeDefault)
return channel
}
// addChannel return chan
func (ctx *ChannelContext) addChannel(module string, moduleCh chan model.Message) {
ctx.chsLock.Lock()
defer ctx.chsLock.Unlock()
ctx.channels[module] = moduleCh
}
这里实际上就是给beehiveContext中的moduleContext添加了一个类型为model.Message的channel,并保存到channels这个map中。
beehiveContext.AddModuleGroup
// AddModuleGroup adds module into module context group
func AddModuleGroup(module, group string) {
context.moduleContext.AddModuleGroup(module, group)
}
//AddModuleGroup adds modules into module context group
func (ctx *ChannelContext) AddModuleGroup(module, group string) {
if channel := ctx.getChannel(module); channel != nil {
ctx.addTypeChannel(module, group, channel)
return
}
klog.Warningf("Get bad module name %s when addmodulegroup", module)
}
这里就是把刚才创建的channel以group的名字为key创建一个map,然后把整个map放到moduleContext的typeChannels中。
group的名字是硬编码在实现module接口的具体module上的,比如cloudhub的的Group返回的就是"cloudhub"
func (a *cloudHub) Group() string {
return "cloudhub"
}
module.Start
最后就是调用每个module的start方法了。
Start是一个接口,每个具体的module都实现了自己的Start方法
具体module的Start方法,后续放到各个模块的分析中,这里就不仔细看了。
context
context目录下主要有context、context_channel以及context_factory。context中定义了接口,context_factory提供对外的操作接口实现,context_channel则是在context_factory中调用。context_factory与context_channel存在同名的接口。
Done
Done方法比较简单,直接调用了go context的Done方法
Cancel
Cancel调用的是beehiveContext的cancel方法,而beehiveContext的cancel方法就是创建go context时候返回的cancel方法
Cleanup
Cleanup调用的是context.moduleContext.Cleanup(module),也就是ChannelContext的Cleanup方法
func (ctx *ChannelContext) Cleanup(module string) {
if channel := ctx.getChannel(module); channel != nil {
ctx.delChannel(module)
// decrease probable exception of channel closing
time.Sleep(20 * time.Millisecond)
close(channel)
}
}
// deleteChannel by module name
func (ctx *ChannelContext) delChannel(module string) {
// delete module channel from channels map
ctx.chsLock.Lock()
_, exist := ctx.channels[module]
if !exist {
klog.Warningf("Failed to get channel, module:%s", module)
return
}
delete(ctx.channels, module)
ctx.chsLock.Unlock()
// delete module channel from typechannels map
ctx.typeChsLock.Lock()
for _, moduleMap := range ctx.typeChannels {
if _, exist := moduleMap[module]; exist {
delete(moduleMap, module)
break
}
}
ctx.typeChsLock.Unlock()
}
这里就是根据module的名称,在ChannelContext的那几个map中删掉对应的channel的引用,
最后关闭channel.
Send
Send方法是调用context.messageContext.Send(module, message),也就是调用了ChannelContext的Send方法。
// Send send msg to a module. Todo: do not stuck
func (ctx *ChannelContext) Send(module string, message model.Message) {
// avoid exception because of channel colsing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when send message, exception: %+v", exception)
}
}()
if channel := ctx.getChannel(module); channel != nil {
channel <- message
return
}
klog.Warningf("Get bad module name :%s when send message, do nothing", module)
}
ChannelContext的Send方法根据module的名字先取出channel,然后放入message。
Receive
调用message, err := context.messageContext.Receive(module)。
和上面的Send对应,就是从module名称对应的channel中取出message
SendSync
// SendSync sends message in a sync way
func (ctx *ChannelContext) SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error) {
// avoid exception because of channel colsing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when sendsync message, exception: %+v", exception)
}
}()
if timeout <= 0 {
timeout = MessageTimeoutDefault
}
deadline := time.Now().Add(timeout)
// make sure to set sync flag
message.Header.Sync = true
// check req/resp channel
reqChannel := ctx.getChannel(module)
if reqChannel == nil {
return model.Message{}, fmt.Errorf("bad request module name(%s)", module)
}
sendTimer := time.NewTimer(timeout)
select {
case reqChannel <- message:
case <-sendTimer.C:
return model.Message{}, errors.New("timeout to send message")
}
sendTimer.Stop()
// new anonymous channel for response
anonChan := make(chan model.Message)
anonName := getAnonChannelName(message.GetID())
ctx.anonChsLock.Lock()
ctx.anonChannels[anonName] = anonChan
ctx.anonChsLock.Unlock()
defer func() {
ctx.anonChsLock.Lock()
delete(ctx.anonChannels, anonName)
close(anonChan)
ctx.anonChsLock.Unlock()
}()
var resp model.Message
respTimer := time.NewTimer(time.Until(deadline))
select {
case resp = <-anonChan:
case <-respTimer.C:
return model.Message{}, errors.New("timeout to get response")
}
respTimer.Stop()
return resp, nil
}
上半部分就是在Send的基础上加了一个计时器,如果向通道写入消息超时,那么久返回空消息体和error信息。
下半部分创建了一个匿名的通道(实际上也是有名字的),并保存到ChannelContext的ctx.anonChannels这个map中。
需要注意的是,这里所谓的匿名通道,实际上是有名字的(message.GetID()),只是这个名字可能是message在创建时候生成的一个随机的uuid。
这个名字我们放到分析message的时候再看。
接下来就是在这个新的匿名通道上等待消息,并且也设置了超时时间,如果超时了,就返回空的消息和error。
总结一下,sendSync就是先发送一个message了,并以这个message的id创建一个新的通道,然后在这个通道上等待回复。
SendResponse
调用context.messageContext.SendResp(resp)。
// SendResp send resp for this message when using sync mode
func (ctx *ChannelContext) SendResp(message model.Message) {
anonName := getAnonChannelName(message.GetParentID())
ctx.anonChsLock.RLock()
defer ctx.anonChsLock.RUnlock()
if channel, exist := ctx.anonChannels[anonName]; exist {
channel <- message
return
}
klog.Warningf("Get bad anonName:%s when sendresp message, do nothing", anonName)
}
sendResp和之前的sendSync是对应的,就是向sendSync中创建的匿名通道中,发送消息。
这里的通道名词是通过message.GetParentID()获取的,注意创建的时候,名词用的是GetID,这里怎么对应也放到message模块再分析。
SendToGroup
调用context.messageContext.SendToGroup(moduleType, message)
这里就是根据module type的名字查出对应的通道列表,然后分别调用Send方法
SendToGroupSync
调用context.messageContext.SendToGroupSync(moduleType, message, timeout)
// SendToGroupSync : broadcast the message to echo module channel, the module send response back anon channel
// check timeout and the size of anon channel
func (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error {
// avoid exception because of channel colsing
// TODO: need reconstruction
defer func() {
if exception := recover(); exception != nil {
klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception)
}
}()
if timeout <= 0 {
timeout = MessageTimeoutDefault
}
deadline := time.Now().Add(timeout)
channelList := ctx.getTypeChannel(moduleType)
if channelList == nil {
return fmt.Errorf("failed to get module type(%s) channel list", moduleType)
}
// echo module must sync a response,
// let anonchan size be module number
channelNumber := len(channelList)
anonChan := make(chan model.Message, channelNumber)
anonName := getAnonChannelName(message.GetID())
ctx.anonChsLock.Lock()
ctx.anonChannels[anonName] = anonChan
ctx.anonChsLock.Unlock()
cleanup := func() error {
ctx.anonChsLock.Lock()
delete(ctx.anonChannels, anonName)
close(anonChan)
ctx.anonChsLock.Unlock()
var uninvitedGuests int
// cleanup anonchan and check parentid for resp
for resp := range anonChan {
if resp.GetParentID() != message.GetID() {
uninvitedGuests++
}
}
if uninvitedGuests != 0 {
klog.Errorf("Get some unexpected:%d resp when sendToGroupsync message", uninvitedGuests)
return fmt.Errorf("got some unexpected(%d) resp", uninvitedGuests)
}
return nil
}
// make sure to set sync flag before sending
message.Header.Sync = true
var timeoutCounter int32
send := func(ch chan model.Message) {
sendTimer := time.NewTimer(time.Until(deadline))
select {
case ch <- message:
sendTimer.Stop()
case <-sendTimer.C:
atomic.AddInt32(&timeoutCounter, 1)
}
}
for _, channel := range channelList {
go send(channel)
}
sendTimer := time.NewTimer(time.Until(deadline))
ticker := time.NewTicker(TickerTimeoutDefault)
for {
// annonChan is full
if len(anonChan) == channelNumber {
break
}
select {
case <-ticker.C:
case <-sendTimer.C:
cleanup()
if timeoutCounter != 0 {
errInfo := fmt.Sprintf("timeout to send message, several %d timeout when send", timeoutCounter)
return fmt.Errorf(errInfo)
}
klog.Error("Timeout to sendToGroupsync message")
return fmt.Errorf("Timeout to send message")
}
}
return cleanup()
}
SendToGroupSync并没有调用SendToGroup方法,而是直接调用了Send方法。
首先创建了一个size和group中channel数量一样的匿名通道,然后遍历通道,调用send发送消息,然后等待匿名通道收到消息的数量等于size。
还有个要注意的地方,这里并没有吧匿名通道的消息取出来返回出去,而是调用了cleanup方法直接清理通道。也就是说,用这种方式发送消息,只能得到是否消息都得到回复,而不能得到具体回复的内容。
model/message
message.go中定义了通道中使用的消息格式,
// Message struct
type Message struct {
Header MessageHeader `json:"header"`
Router MessageRoute `json:"route,omitempty"`
Content interface{} `json:"content"`
}
//MessageRoute contains structure of message
type MessageRoute struct {
// where the message come from
Source string `json:"source,omitempty"`
// where the message will broadcasted to
Group string `json:"group,omitempty"`
// what's the operation on resource
Operation string `json:"operation,omitempty"`
// what's the resource want to operate
Resource string `json:"resource,omitempty"`
}
//MessageHeader defines message header details
type MessageHeader struct {
// the message uuid
ID string `json:"msg_id"`
// the response message parentid must be same with message received
// please use NewRespByMessage to new response message
ParentID string `json:"parent_msg_id,omitempty"`
// the time of creating
Timestamp int64 `json:"timestamp"`
// the flag will be set in sendsync
Sync bool `json:"sync,omitempty"`
}
Message由MessageRoute、MessageHeader以及Content接口组成。 整个message都是json转换过来的。
这些类型上关联的方法,这里就不一一看了,主要都是些get、set方法,看几个特殊的:
UpdateID
//UpdateID returns message object updating its ID
func (msg *Message) UpdateID() *Message {
msg.Header.ID = uuid.NewV4().String()
return msg
}
可以看出,updateId是新生成了一个uuid放到header中。
NewMessage
func NewMessage(parentID string) *Message {
msg := &Message{}
msg.Header.ID = uuid.NewV4().String()
msg.Header.ParentID = parentID
msg.Header.Timestamp = time.Now().UnixNano() / 1e6
return msg
}
生成一个新的message结构,需要传入parentID,自己的ID则是随机生成的
Clone
生成一个新的ID,其他字段与被clone的message一致
NewRespByMessage
// NewRespByMessage returns a new response message by a message received
func (msg *Message) NewRespByMessage(message *Message, content interface{}) *Message {
return NewMessage(message.GetID()).SetRoute(message.GetSource(), message.GetGroup()).
SetResourceOperation(message.GetResource(), ResponseOperation).
FillBody(content)
}
注意到NewMessage的参数是parentID,所以这个Response Message中,实际上就是用了收到的message的id作为parenet的id。
route则是用收到的message的source。
有疑问加站长微信联系(非本文作者)