go综合实例之发布订阅模型

skeyboy · · 872 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

``` import ( "fmt" "strings" "sync" "time" ) type ( subscriber chan interface{} //订阅者 topicFunc func(v interface{}) bool //主题过滤器 ) type Publisher struct { m sync.RWMutex //读写锁 buffer int //订阅队列缓存大小 timeout time.Duration //发布超时 subscribers map[subscriber]topicFunc //订阅者信息 } //新建一个发布者对象,可以设置发布超时和缓存队列打长度 func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{buffer: buffer, timeout: publishTimeout, subscribers: make(map[subscriber]topicFunc), } } // 添加一个新的订阅者,订阅过滤筛选后的主题 func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() p.subscribers[ch] = topic p.m.Unlock() return ch } // 添加一个订阅者 func (p *Publisher) Subscribe() chan interface{} { return p.SubscribeTopic(nil) } //退出订阅 func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() defer p.m.Unlock() delete(p.subscribers, sub) close(sub) } func (p *Publisher) Close() { p.m.Lock() defer p.m.Unlock() for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) } } //发送主题 可以容忍一定的超时 func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { defer wg.Done() /* 两种情况 1 过滤器为nil 则全部放行,也就是对所有的主题全部订阅的人 2 过滤出感兴趣的主题,然后写入管道中 */ if topic != nil && !topic(v) {//过滤出感兴趣的主题 return } select { case sub <- v: case <-time.After(p.timeout): } } //发布主题 func (p *Publisher) Publish(v interface{}) { p.m.RLock() defer p.m.RUnlock() //通过waitgroup来等待collection中管道finished var wg sync.WaitGroup for sub, topic := range p.subscribers {//每次发布一个消息就发送给所有的订阅者 wg.Add(1) go p.sendTopic(sub, topic, v, &wg) } wg.Wait() } func Test() { p := NewPublisher(100*time.Millisecond, 10) defer p.Close() all := p.Subscribe() golang := p.SubscribeTopic(func(v interface{}) bool { if s, ok := v.(string); ok { return strings.Contains(s, "golang") } return false }) p.Publish("hello, world") p.Publish("hello, golang") go func() { for msg := range all { fmt.Println("golang", msg) } }() go func() { for msg := range golang { fmt.Println("golang", msg) } }() time.Sleep(3 * time.Second) } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

872 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传