nsq简介
nsq是go实现的高性能消息队列,部署相当简单。
一.搭建nsq集群
1.拉取docker镜像
docker pull nsqio/nsq #拉取nsq镜像
docker images #查看镜像
2.启动nsqlookup服务
docker run -d --name nsqlookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
3.启动两个nsqd节点服务
nsqd是nsq里面保存数据的服务
- 在服务器192.168.6.100上启动第一个nsqd节点
docker run --net host -v /media/guo/wd5400tb2/nsq:/media/nsq --name nsqd00 nsqio/nsq /nsqd --tcp-address :4150 --http-address :4151 --broadcast-address=192.168.6.100 --lookupd-tcp-address=192.168.6.100:4160 --data-path /media/nsq
-v 指定挂载目录
-v /media/guo/st7200tb2/nsq:/media/nsq
,把本机的/media/guo/st7200tb2/nsq
挂载到/media/nsq
目录下面
- 在服务器192.168.6.100上启动第二个nsqd节点
docker run --net host -v /media/guo/st7200tb2/nsq:/media/nsq --name nsqd01 nsqio/nsq /nsqd --tcp-address :4250 --http-address :4251 --broadcast-address=192.168.6.100 --lookupd-tcp-address=192.168.6.100:4160 --data-path /media/nsq
--data-path /media/nsq
指定nsqd数据保存目录
4.启动admin节点
docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=192.168.6.100:4161
浏览器打开:http://192.168.6.100:4171/nodes ,此时的NSQd Nodes
应该有两个,如果看到说明环境部署成功,恭喜了
二.curl 创建topic和channel
1.创建topic
curl -X POST http://192.168.6.100:4151/topic/create?topic=test-topic
2.创建channel
curl -X POST 'http://192.168.6.100:4151/channel/create?topic=test-topic&channel=test-channel'
三.测试
使用curl+nsq_to_file测试
- 向nsq写入消息
curl -d 'hello world 1' 'http://192.168.6.100:4151/pub?topic=test-topic'
- 使用nsq_to_file消费nsq里面的消息
nsq_to_file --topic=test-topic --output-dir=/tmp --lookupd-http-address=192.168.6.100:4161
使用golang测试
先启动服务端,用于消费消息,其中代码里指定nsqlookup服务的地址
server 端代码(消费消息)
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"time"
)
type myMessageHandler struct{}
// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
fmt.Printf("body is empty\n")
return nil
}
fmt.Printf("------------>:%s\n", m.Body)
//err := processMessage(m.Body)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return nil
}
func main() {
// Instantiate a consumer that will subscribe to the provided channel.
config := nsq.NewConfig()
config.MaxInFlight = 2
consumer, err := nsq.NewConsumer("test-topic", "test-channel", config)
if err != nil {
log.Fatal(err)
}
// Set the Handler for messages received by this Consumer. Can be called multiple times.
// See also AddConcurrentHandlers.
consumer.AddHandler(&myMessageHandler{})
// Use nsqlookupd to discover nsqd instances.
// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
err = consumer.ConnectToNSQLookupd("192.168.6.100:4161")
if err != nil {
log.Fatal(err)
}
// Gracefully stop the consumer.
for {
time.Sleep(time.Second)
}
consumer.Stop()
}
client端(生产消息)
启动client用于向nsq写入消息。其中代码里面指定了一个nsqd服务的地址
package main
import (
"github.com/nsqio/go-nsq"
"log"
)
func main() {
config := nsq.NewConfig()
producer, err := nsq.NewProducer("192.168.6.100:4250", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello")
topicName := "test-topic"
// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err = producer.Publish(topicName, messageBody)
if err != nil {
log.Fatal(err)
}
// Gracefully stop the producer.
producer.Stop()
}