到目前为止,我们项目的结果大致如下:
传感器生成的模拟数据(包含传感器名称、数据、时间戳)是通过传感器在运行时动态创建的 Queue 来发送的。这些 Queue 很难直接被发现。
为了解决这个问题,我创建了另一个消息,它包含各传感器的 Queue 的路由 key,这个消息是在一个“众所周知”的 Queue 上发布的,所以协调器就可以得到传感器的路由信息。
传感器的数据是发布在默认的 Direct Exchange 上,也就是说只有一个消费者可以收到这个消息,这就是我们想要的效果。具体的,无论有多少个协调器,RabbitMQ 会保证只有一个协调器会收到信息,并且只会收到一次。
然后,用于发现传感器的路径确有不同的需求,如果存在多个协调器,那么当传感器上线的时候,所有的协调器都必须得知,所以就不能使用 Direct Exchange 了。这时使用 Fanout Exchange 就比较合理了,Fanout Exchange 将会同时通知所有附加在 Exchange 上面的 Queue,也就是把传感器的路由信息发送给所有在线的协调器。
但是这也有其他问题:如果没有接收者监听,那么这些路由信息不会保留,这个问题稍后再解决,我们先把发布路由信息的 Exchange 从 Direct 改为 Fanout。
使用 Fanout Exchange 发布传感器路由信息
目前,在传感器项目中,我们使用默认的 Direct Exchange 来发布传感器路由消息:
看一下管理控制台,可以看到 RabbitMQ 还提供了一个 Fanout Exchange 也就是 amq.fanout:
修改代码,暂时改用 amq.fanout 来发布传感器路由信息:
首先,删除第 38 行的代码,它原是用来创建一个 Queue 以便协调程序可以接收到传感器的路由信息。现在,这个工作将由 Exchange 的消费者们来完成,它们会创建自己的 Queue 来监听这个 Exchange。
第 43 行,把路由 Key 改为 “”,因为 Fanout Exchange 不需要使用该 Key 来决定消息发往哪里,它会把消息进行复制并发送到每个绑定到它的 Queue 上面。
最后,第 42 行,把 exchange 这个参数改为 amq.fanout。
运行 sensors 项目查看效果
打开控制台:
可以看到 amq.fanout 确实有数据了,尽管现在的消息传递速率为 0。
点进去:
可以看到一个路由信息,但是因为没有任何 Queue 绑定到这个 Exchange,这个消息就丢失了,因为消息无处可发。
重建协调器
在最早几节内容中,我做了一个非常简单的协调器程序,它可以简单的发布和接收消息。为了配合我们的应用场景,我们需要建立一个更健壮一些的协调器。它的主要职责是:通过消息代理(RabbitMQ)与传感器进行交互。
不过首先,为了代码复用,我对现有的项目结构进行调整:
我把项目的外层目录名从 sensors 改为 demo,然后在里面建立sensors 文件夹,把 main.go 移动到 sensors 里面,并改名为 sensor.go。
然后建立 coordinator 文件夹,在里面建立 queuelistener.go 文件,内容较多,我分为三个图展示:
第 15 行,建立 QueueListener struct,它里面包含发现传感器数值 Queue 的逻辑,接收它们的消息,并把它们在一个事件聚合器里面翻译成事件。不过目前它主要聚焦获取消息这项工作,所以它有三个字段:
到 RabbitMQ 的连接
在该连接上的 Channel
一个 Map,当作注册表,里面存放着这个协调器所监听的源,使用 Map 可以防止将同一个传感器注册两次,而当传感器下线的时候可以通过这个 Map 来关闭监听(这个我就不实现了)
第 21 行,建立一个构造函数,它可以返回一个 *QueueListener
第 31 行创建一个方法 ListenForNewSource:
它可以让 QueueListener 发现新的传感器,在这里创建 Queue 的时候,我们不关心 Queue 的名称,所以 name 参数为“”,这样的话 RabbitMQ 会为它创建一个唯一的名称。
但是当 Queue 被创建时,它会默认绑定到 Direct Exchange。而在之前,我刚把代码修改为让传感器通过 amq.fanout Exchange 来发布它们的信息,所以我们需要把这个 Queue 重新绑定到那个上面。这里就使用 Channel 上的 QueueBind 方法来实现(第 33 行)。
QueueBind 方法参数:
第一个参数是刚刚创建的 Queue 的名称,这就是要绑定的 Queue
第二个参数是路由 Key,由于 Fanout Exchange 会忽略这个参数,所以这里写“”
第三个参数是要绑定的 Exchange 的名称,也就是 amq.fanout
第四个参数,如果把 noWait 设置为 true,那么万一绑定不成功,就会把 Channel 关闭。这里我把它设为 false,因为我知道 Exchange 和 Queue 都会存在,如果失败,那么会关闭 Channel 并发生错误。
第五个参数不需要,设为 nil
第 40 行,设置消息的接收,返回 Go Channel,这里的参数需要用到 Queue 的名称
第 49 行,通过 for range 来处理通过 Go Channel 发过来的消息。如果接收到消息,表示有新的传感器上线了。
第 50 行,在有传感器上线后,通过 Consume 方法和 msg.Body(也就是传感器的名称),来读取传感器的模拟数据。记得我们把传感器的模拟数据发布到了默认的 Direct Exchange 上面,所以每次只会把消息传递给一个接收者,这意味着,当我注册了多个协调器的时候,它们将共享到这些 Queue 的访问,当这些发生的时候,RabbitMQ 将会轮流传送给每一个注册的接收者。这也就允许我们对协调器进行横向扩展,而且不影响整个系统其余的部分。
第 59 行,判断传感器是否在该协调器中注册,如果没有,那就进行注册。
第 62 行,使用 goroutine 来调用 AddListener 方法,该方法代码如下:
这个方法将会监听 Go Channel 中的消息
在里面使用 for range 来等待 Go Channel 传送消息
在这里,我们把二进制数据转化为我们可以在程序里使用的数据,也就是 SensorMessage 类型
然后暂时先打印即可
建立协调器的 main
在 coordinator 目录下建立 exec 文件夹,目的是创建 main package,在里面创建 main.go 代码如下:
第 9 行,我们创建一个 QueueListener
第 10 行,使用 goroutine 让他进行监听,防止阻塞主线程
第 12-13 行的目的就是让程序一直存活,防止 goroutine 停止运行。
最后 sensor.go 里面有一处代码需要修改,在 main 函数的 for 循环里面,每次使用 encoder 的时候都需要 重新创建一个,所以我添加了 63 行的代码:
运行
我们运行一下试试,注意:一定要先运行 coordinator 项目,然后再运行 sensors 项目,否则会有问题。 下面左侧是 coordinator,右侧是 sensors:
可以看到 coordinator(协调器)可以读取到传感器的数据了。
这里我们使用了一个最简单最基本的机制来做传感器 Queue 的发现。
有疑问加站长微信联系(非本文作者)