golang nats request reply 模式

luckyase · · 1527 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

请求响应模式

无论是发布订阅模式还是queue模式,nats都不能保证消息一定发送到订阅方,除非订阅者发送一个响应给发布者。
所以订阅者发送一个回执给发布者,就是请求响应模式。

这种模式有什么用?

nats要求订阅者一定要先完成订阅,发布消息后,订阅者才能收到消息,类似离线消息的模式nats不支持。就算先完成订阅,后发送消息,消息发送方也不知道是否有订阅者收到了消息,请求响应模式就是应对这种情况。

基本流程

A发送消息,B收到消息,发送回执给A。这就是request reply的基本流程。

基本实现原理

  • A启用request模式发送消息(消息中包含了回执信息,replya主题),同步等待回执(有超时时间)。
  • B收到消息,在消息中取出回执信息=replay主题,对replay主题,主动发送普通消息(消息内容可自定义,比如server A上的service1收到msgid=xxxx的消息。)。
  • A在超时内收到消息,确认结束。
  • A在超时内未收到消息,超时结束。

注意

  • 因为A发送的消息中包装了回执测相关信息,订阅者B收到消息后,也要主动发送回执,所以请求响应模式,对双方都有影响。
  • A发送消息后,等待B的回执,需要给A设置超时时间,超时后,不在等待回执,直接结束,效果和不需要回执的消息发送一样,不在关心是否有订阅者收到消息。

两种模式

request reply有两种模式:

  • one to one 默认模式

1条消息,N个订阅者,消息发送方,仅会收到一条回执记录(因为消息发送方收到回执消息后,就自动断开了对回执消息的订阅。),即使N个订阅都都收到了消息。注意:pub/sub和queue模式的不同

  • one to many 非默认模式,需要自己实现

1条消息,N个订阅者,消息发送方,可以自己设定一个数量限制N,接受到N个回执消息后,断开对回执消息的订阅。

Server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc *nats.Conn

    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err) {
        }
    }
}

func main() {
    var (
        servername = flag.String("servername", "Y", "name for server")
        queueGroup = flag.String("group", "", "group name for Subscribe")
        subj       = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()

    mode := "queue"
    if *queueGroup == "" {
        mode = "pub/sub"
    }
    log.Printf("Server[%v] Subscribe Subject[%v] in [%v]Mode", *servername, *subj, mode)

    startService(*subj, *servername+" worker1", *queueGroup)
    startService(*subj, *servername+" worker2", *queueGroup)
    startService(*subj, *servername+" worker3", *queueGroup)

    nc.Flush()
    select {}
}

//receive message
func startService(subj, name, queue string) {
    go async(nc, subj, name, queue)
}

func async(nc *nats.Conn, subj, name, queue string) {
    replyMsg := name + " Received a msg"
    if queue == "" {
        nc.Subscribe(subj, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    } else {
        nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    }

}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

Client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "github.com/pborman/uuid"
    "flag"
    "time"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc         *nats.Conn
    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err, func() {

    }) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err, func() {

            }) {

        }
    }
}

func main() {
    var (
        subj = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()
    log.Println(*subj)
    startClient(*subj)

    time.Sleep(time.Second)
}

//send message to server
func startClient(subj string) {
    for i := 0; i < 3; i++ {
        id := uuid.New()
        log.Println(id)
        if msg, err := nc.Request(subj, []byte(id+" hello"), time.Second); checkErr(err, func() {
            // handle err
        }) {
            log.Println(string(msg.Data))
        }
    }
}

func checkErr(err error, errFun func()) bool {
    if err != nil {
        log.Println(err)
        errFun()
        return false
    }
    return true
}

pub/sub模式启动

$ ./main
2018/08/18 18:54:10 Server[Y] Subscribe Subject[yasenagat] in [pub/sub]Mode
2018/08/18 18:54:26 Y worker2 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello

发送消息

$ ./main
2018/08/18 18:54:26 yasenagat
2018/08/18 18:54:26 b035d7c2-e7e9-4337-bb8a-a23ec85fc31a
2018/08/18 18:54:26 Y worker3 Received a msg
2018/08/18 18:54:26 2d8dfe75-8fee-4b4c-8599-1824638dfa8c
2018/08/18 18:54:26 Y worker2 Received a msg
2018/08/18 18:54:26 fe9f773a-129b-4919-9bc4-c8a4571fef6e
2018/08/18 18:54:26 Y worker2 Received a msg

queue模式启动

$ ./main -group=test
2018/08/18 19:14:31 Server[Y] Subscribe Subject[yasenagat] in [queue]Mode
2018/08/18 19:14:33 Y worker2 Received a message From Async :  4ecf2728-b3a7-4181-893a-aefde3bc8d2e hello Y worker2 Received a msg
2018/08/18 19:14:33 Y worker3 Received a message From Async :  4e7f1363-9a47-4705-b87a-4aaeb80164f0 hello Y worker3 Received a msg
2018/08/18 19:14:33 Y worker2 Received a message From Async :  38b1f74b-8a3b-46ba-a10e-62e50efbc127 hello Y worker2 Received a msg

发送消息

$ ./main
2018/08/18 19:14:33 yasenagat
2018/08/18 19:14:33 4ecf2728-b3a7-4181-893a-aefde3bc8d2e
2018/08/18 19:14:33 Y worker2 Received a msg
2018/08/18 19:14:33 4e7f1363-9a47-4705-b87a-4aaeb80164f0
2018/08/18 19:14:33 Y worker3 Received a msg
2018/08/18 19:14:33 38b1f74b-8a3b-46ba-a10e-62e50efbc127
2018/08/18 19:14:33 Y worker2 Received a msg

queue模式下,发送3条消息,3个订阅者有相同的queue,每条消息只有一个订阅者收到。

pub/sub模式下,发送3条消息,3个订阅者都收到3条消息,一共9条。

总结:

回执主要解决:订阅者是否收到消息的问题、有多少个订阅者收到消息的问题。(不是具体业务是否执行完成的回执!)
基于事件的架构模式可以构建于消息机制之上,依赖消息机制。异步调用的其中一种实现方式,就是基于事件模式。异步调用又是分布式系统中常见的任务处理方式。

业务模式

  • 业务A发送eventA给事件中心,等待回执
  • 事件中心告知A收到了消息,开始对外发送广播
  • 订阅者B订阅了eventA主题
  • 事件中心对eventA主题发送广播,等待回执
  • B收到消息,告知事件中心,收到eventA,开始执行任务taskA
  • B异步执行完taskA,通知事件中心taskAComplete,等待回执
  • 事件中心发送回执给B,对外发送广播,taskAComplete
  • ........

如果超时,未能收到回执,需要回执信息的确认方可以主动调用相关接口,查询任务执行状态,根据任务状态做后续的处理。


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:luckyase

查看原文:golang nats request reply 模式

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1527 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传