大家使用较多的生产消费模式中间件,大都是订阅发布机制,反过来大家用这些中间件肯定都会用,一方subscribe(订阅)一个topic,另一方publish(发布)消息向该topic,订阅了该topic的subsciber都会收到该消息,反之,则收不到。那么如果让大家手动实现个这个机制,大家向下是否能够实现出来?小伙伴们不妨切到自己的开发工具上手动敲下代码代码试试。如果没问题的可以不用往下看了,浪费大家的时间,如果敲不出来的可以继续往下看,我带大家一起边敲边梳理这个流程。
想下我们在使用订阅发布组件的时候,编写demo的例程的逻辑步骤
1、创建publisher对象,这样就要有publisher类
2、创建publisher 的client,也就是subscriber
3、subscriber定于publisher的topic
4、publisher发布消息
5、subscriber可以收到publisher发布的消息
至此完成。
首先我们写main,因为我之前也试图先写子函数,以及在看资料的时候,资料的例子往往从struct的定义写起,这样不利于自己思考,而且即使看完之后知识还是在那里安静的躺着,并没有被吸收到你的大脑里。好的,废话不多说。将上面的流程在main函数中体现出来:
func main() {
p :=NewPublisher() //对应上面的第一步
s := NewSubscriber() //对应上面的第二步
p.addSubscriber(s) //对应上面的第三部
})
p.publish("Hello, golang") //对应上面的第4步
go func() { //对应上面的第5步
if msg <-s {
fmt.Println(msg)
}
}()
time.Sleep(time.Second *3)
}
就这么简单,上面是一段伪代码,下面我们在一点点的填充丰富下代码。
第一个publiser类,需要填充下信息,大家想下最基础的publisher需要有哪些成员,1、超时时间,以便在publish消息时发生阻塞;2、存储第三部添加进来的subscriber,这里可以使用slice存储;
第二步subscriber类,大家想下该类中应该有哪些成员,1、channel interface{},因为publisher向subscriber发布消息的时候,只需向每个subsciber发布msg就可以了,subsciber监听这个channel,有消息打印即可;2、既然有channel相应的就要知道channel的buf大小,所有第二个成员buffer。
就这么简单的demo 我们实现下:
func main() {
p :=newPublisher(100 *time.Millisecond)
defer p.close()
s :=newSubsciber(10)
p.addSubscribe(*s)
p.publish("Hello, golang")
p.publish("Hello,php")
p.publish("Hello,java")
go func() {
for msg :=range s.ch {
fmt.Println(msg)
}
}()
sig :=make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sig:
fmt.Println("finished!!!")
}
}
//发布者对象
type publisher struct {
timeout time.Duration
subscribers []subscriber
}
type subscriber struct {
buffer int
ch chan interface{}
}
func newPublisher(timeout time.Duration) *publisher {
return &publisher{timeout: timeout}
}
func newSubsciber(buf int) *subscriber {
return &subscriber{buffer: buf, ch:make(chan interface{}, buf)}
}
func (p *publisher)addSubscribe(sub subscriber) {
p.subscribers =append(p.subscribers, sub)
}
func (p *publisher) close() {
for i, _ :=range p.subscribers {
close(p.subscribers[i].ch)
p.subscribers =append(p.subscribers[0:i], p.subscribers[i+1:]...)
}
}
func (p *publisher)publish(v interface{}) {
var wg sync.WaitGroup
for _, sub :=range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, v, &wg)
}
}
func (p *publisher)sendTopic(sub subscriber, vinterface{}, wg *sync.WaitGroup) {
defer wg.Done()
select {
case sub.ch <- v:
case <-time.After(p.timeout):
}
}
Ok,一个简单的订阅发布模型完成。接下来再增加点难度,根据发布的内容,某个subscriber订阅包含特殊字符串消息的消息,如果没有则不发送,该怎么实现?
有疑问加站长微信联系(非本文作者)