先说一个实际的业务场景:
Client端有一个请求需要进行耗时处理或者查询,这个处理在Server端做。Server 端处理完后通知给请求的Client端。
这种场景可以称之为RPC(Remote Procedure Call)
有两个点说明一下:
- <1>Client端发送请求给Server端可以简单定义一个Queue。Client作为Producer发布消息,Server端作为Cosumer消费消息
-
<2>Server端处理完耗时处理后需要将处理结果返回给请求的客户端。
- 可以在Client声明一个不指定名称的Queue,系统会自动生成一个随机名称的Queue。将Queue的名称在publish是发送给Server端
- 因为Server端要将处理结果返回给对应的请求,所以在Client端需要生成一个CorrelationId发送给Server端
处理流程
Client端
(1)声明从Server返回消息用的queue
respQueue, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil,
)
(2)发送请求消息到rpc_queue
err = ch.Publish(
"", //exchange
config.QUEUENAME, //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: correlationID,
ReplyTo: respQueue.Name,
Body: []byte(msgBody),
})
corrId为自己随机生成的Id
(2)Server端
(3)声明rpc_queue,从rpc_queue中消费消息
q, err := ch.QueueDeclare(
config.QUEUENAME,
false,
false,
false,
false,
nil,
)
msgs, err := ch.Consume(
q.Name,
"",
false, // auto ack
false,
false,
false,
nil,
)
(4)执行处理后使用msg中的ReplyTo返回处理结果给Client
err = ch.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: msg.CorrelationId,
Body: []byte(bookName),
})
msg.Ack(false)
(5)Client端从reply queue中接收从Server端来的response
respMsgs, err := ch.Consume(
respQueue.Name,
"",
true, // auto-ack
true, // exclusive
false, // noLocal
false, // nowait
nil,
)
详细代码如下
conf.go
package config
const (
RMQADDR = "amqp://guest:guest@172.17.84.205:5672/"
QUEUENAME = "rpc_queue"
SERVERINSTANCESCNT = 5
)
client.go
package main
import (
config "binTest/rabbitmqTest/t1/l6/conf"
"fmt"
"log"
"math/rand"
"os"
"github.com/streadway/amqp"
)
func main() {
if len(os.Args) < 2 {
log.Println("Arguments error")
return
}
conn, err := amqp.Dial(config.RMQADDR)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgBody := os.Args[1]
respQueue, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil,
)
failOnError(err, "Failed to declare a response queue")
correlationID := randomID(32)
err = ch.Publish(
"", //exchange
config.QUEUENAME, //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: correlationID,
ReplyTo: respQueue.Name,
Body: []byte(msgBody),
})
log.Printf(" [x] Sent %s", msgBody)
failOnError(err, "Failed to publish a message")
respMsgs, err := ch.Consume(
respQueue.Name,
"",
true, // auto-ack
true, // exclusive
false, // noLocal
false, // nowait
nil,
)
for item := range respMsgs {
if item.CorrelationId == correlationID {
fmt.Println("response:", string(item.Body))
break
}
}
}
func failOnError(err error, msg string) {
if err != nil {
fmt.Printf("%s: %s\n", msg, err)
}
}
func randomID(length int) string {
if length <= 0 {
return ""
}
bytes := make([]byte, length)
for i := 0; i < length; i++ {
bytes[i] = byte(rand.Intn(9))
}
return string(bytes)
}
server.go
package main
import (
config "binTest/rabbitmqTest/t1/l6/conf"
"fmt"
"log"
"math/rand"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial(config.RMQADDR)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.Qos(
config.SERVERINSTANCESCNT,
0,
false,
)
forever := make(chan bool)
for routine := 0; routine < config.SERVERINSTANCESCNT; routine++ {
go func(routineNum int) {
q, err := ch.QueueDeclare(
config.QUEUENAME,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
false, // auto ack
false,
false,
false,
nil,
)
for msg := range msgs {
log.Printf("In %d start consuming message: %s\n", routineNum, msg.Body)
bookName := queryBookID(string(msg.Body))
err = ch.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: msg.CorrelationId,
Body: []byte(bookName),
})
if err != nil {
fmt.Println("Failed to reply msg to client")
} else {
fmt.Println("Response to client:", bookName)
}
msg.Ack(false)
}
}(routine)
}
<-forever
}
func failOnError(err error, msg string) {
if err != nil {
fmt.Printf("%s: %s\n", msg, err)
}
}
func queryBookID(bookID string) string {
bookName := "QUERIED_" + bookID
time.Sleep(time.Duration(rand.Intn(9)) * time.Second)
return bookName
}
执行效果
Client端
Server端
全部代码可以在如下处取得
https://github.com/BinWang-sh...
有疑问加站长微信联系(非本文作者)