golang 订阅发布机制实现

GoSnail · · 1367 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

大家使用较多的生产消费模式中间件,大都是订阅发布机制,反过来大家用这些中间件肯定都会用,一方subscribe(订阅)一个topic,另一方publish(发布)消息向该topic,订阅了该topic的subsciber都会收到该消息,反之,则收不到。那么如果让大家手动实现个这个机制,大家向下是否能够实现出来?小伙伴们不妨切到自己的开发工具上手动敲下代码代码试试。如果没问题的可以不用往下看了,浪费大家的时间,如果敲不出来的可以继续往下看,我带大家一起边敲边梳理这个流程。

想下我们在使用订阅发布组件的时候,编写demo的例程的逻辑步骤

1、创建publisher对象,这样就要有publisher类

 2、创建publisher 的client,也就是subscriber

3、subscriber定于publisher的topic

4、publisher发布消息

5、subscriber可以收到publisher发布的消息

至此完成。

首先我们写main,因为我之前也试图先写子函数,以及在看资料的时候,资料的例子往往从struct的定义写起,这样不利于自己思考,而且即使看完之后知识还是在那里安静的躺着,并没有被吸收到你的大脑里。好的,废话不多说。将上面的流程在main函数中体现出来:

func main() {

    p :=NewPublisher()    //对应上面的第一步

    s := NewSubscriber()    //对应上面的第二步

    p.addSubscriber(s)    //对应上面的第三部

       })

    p.publish("Hello, golang")     //对应上面的第4步

    go func() {    //对应上面的第5步

        if msg <-s {

            fmt.Println(msg)

        }

    }()

    time.Sleep(time.Second *3)

}

就这么简单,上面是一段伪代码,下面我们在一点点的填充丰富下代码。

第一个publiser类,需要填充下信息,大家想下最基础的publisher需要有哪些成员,1、超时时间,以便在publish消息时发生阻塞;2、存储第三部添加进来的subscriber,这里可以使用slice存储;

第二步subscriber类,大家想下该类中应该有哪些成员,1、channel interface{},因为publisher向subscriber发布消息的时候,只需向每个subsciber发布msg就可以了,subsciber监听这个channel,有消息打印即可;2、既然有channel相应的就要知道channel的buf大小,所有第二个成员buffer。

就这么简单的demo 我们实现下:

func main() {

    p :=newPublisher(100 *time.Millisecond)

    defer p.close()

    s :=newSubsciber(10)

    p.addSubscribe(*s)

    p.publish("Hello, golang")

    p.publish("Hello,php")

    p.publish("Hello,java")

    go func() {

        for msg :=range s.ch {

            fmt.Println(msg)    

        }

    }()

    sig :=make(chan os.Signal)

    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

    select {

        case <-sig:

            fmt.Println("finished!!!")

       }

}


//发布者对象

type publisher struct {

    timeout time.Duration

    subscribers []subscriber

}

type subscriber struct {

    buffer   int

    ch chan interface{}

}

func newPublisher(timeout time.Duration) *publisher {

    return &publisher{timeout: timeout}

}

func newSubsciber(buf int) *subscriber {

    return &subscriber{buffer: buf, ch:make(chan interface{}, buf)}

}

func (p *publisher)addSubscribe(sub subscriber) {

    p.subscribers =append(p.subscribers, sub)

}

func (p *publisher) close() {

    for i, _ :=range p.subscribers {

        close(p.subscribers[i].ch)

        p.subscribers =append(p.subscribers[0:i], p.subscribers[i+1:]...)

    }

}

func (p *publisher)publish(v interface{}) {

    var wg sync.WaitGroup

   for _, sub :=range p.subscribers {

        wg.Add(1)

        go p.sendTopic(sub, v, &wg)

    }

}

func (p *publisher)sendTopic(sub subscriber, vinterface{}, wg *sync.WaitGroup) {

    defer wg.Done()

    select {

        case sub.ch <- v:

        case <-time.After(p.timeout):

    }

}

Ok,一个简单的订阅发布模型完成。接下来再增加点难度,根据发布的内容,某个subscriber订阅包含特殊字符串消息的消息,如果没有则不发送,该怎么实现?


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:GoSnail

查看原文:golang 订阅发布机制实现

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1367 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传