NSQ是一个实时的分布式消息平台。它的设计目标是为在多台计算机上运行的松散服务提供一个现代化的基础设施骨架。
NSQ是由3个进程组成的:
- nsqd 是一个接收、排队、然后转发消息到客户端的进程。
- nsqlookupd管理拓扑信息并提供最终一致性的发现服务。
- nsqadmin用于实时查看集群的统计数据(并且执行各种各样的管理任务)。
1 源码部署
软件下载直接去官网:https://nsq.io/deployment/installing.html
cd /usr/local/nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
访问 nsqadmin
2 docker部署
获取镜像
docker pull nsqio/nsq
启动容器
- 运行lookupd
~docker run -d --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
450cbab82b8eb491d42bf105185c1022010b4d05e65a04f6c52ba15e1f5af06f
- 获取docker host的IP地址
~ docker inspect -f '{{ .NetworkSettings.IPAddress }}' lookupd
172.17.0.2
- 运行nsqd
# --broadcast-address=广播到虚拟机地址
~ docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.1 --lookupd-tcp-address=172.17.0.2:4160
3bc0901c8c485c351cfe31b0ef1a4fa32bf6bf148f0d74907afec6cbb1e4a034
- 运行nsqadmin
~ docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=172.17.0.2:4161
1d4cb219b862613d42bbc0f0bd7d08146f48a32d4e68abae2073cf28ed765bb0
注意:宿主机防火墙是否有拦截
- 查看docker容器是否正常启动运行
~ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
1d4cb219b862 nsqio/nsq "/nsqadmin --lookupd…" 3 minutes ago Up 3 minutes 4150-4151/tcp, 4160-4161/tcp, 4170/tcp, 0.0.0.0:4171->4171/tcp nsqadmin
3bc0901c8c48 nsqio/nsq "/nsqd --broadcast-a…" 3 minutes ago Up 3 minutes 4160-4161/tcp, 0.0.0.0:4150-4151->4150-4151/tcp, 4170-4171/tcp nsqd
450cbab82b8e nsqio/nsq "/nsqlookupd" 4 minutes ago Up 4 minutes 4150-4151/tcp, 4170-4171/tcp, 0.0.0.0:4160-4161->4160-4161/tcp lookupd
- 访问nsqadmin
3 docker-compose部署
version: '2'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
networks:
- nsq-network
hostname: nsqlookupd
ports:
- "4161:4161"
- "4160:4160"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
# command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 -broadcast-address=虚拟机地址
depends_on:
- nsqlookupd
hostname: nsqd
networks:
- nsq-network
ports:
- "4151:4151"
- "4150:4150"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
hostname: nsqadmin
ports:
- "4171:4171"
networks:
- nsq-network
networks:
nsq-network:
配置检查
docker-compose config
启动 docker-compose
➜ nsq docker-compose up -d
Starting nsq_nsqlookupd_1_a12f31d6a776 ... done
Starting nsq_nsqd_1_1c0db410157f ... done
Starting nsq_nsqadmin_1_8c94f3c4a1b7 ... done
访问nsqadmin
客户端支持的库
https://nsq.io/clients/client_libraries.html
golang客户端使用
发送消息
方式一
package main
import (
"bytes"
"fmt"
"net/http"
)
func main() {
httpclient := &http.Client{}
data := `haha`
endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", 4151, "pub", "test")
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer([]byte(data)))
resp, err := httpclient.Do(req)
if err != nil {
fmt.Printf(err.Error())
return
}
if resp.StatusCode != 200 {
fmt.Printf("%s status code: %d", "pub", resp.StatusCode)
}
defer resp.Body.Close()
}
方式二:
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"io/ioutil"
"log"
"sync"
"time"
)
var err error
// 推送消息
func main() {
url := "127.0.0.1:4150"
topicName := "test"
config := nsq.NewConfig()
// new
producer, err := nsq.NewProducer(url, config)
if err != nil {
fmt.Println("nsq.NewProducer", err)
return
}
fmt.Println("nsq.NewProducer", "√")
defer producer.Stop()
producer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
// ping
err = producer.Ping()
if err != nil {
fmt.Println("producer.Ping", err)
return
}
fmt.Println("producer.Ping", "√")
msgCt:=1000
wg := &sync.WaitGroup{}
wg.Add(msgCt)
// 测试10 次
for i := 0; i < msgCt; i++ {
// 消息内容
msg := time.Now().Format("0102150405")
sendMessage(producer, topicName, msg)
wg.Done()
time.Sleep(10*time.Millisecond)
// time.Sleep(1 * time.Second)
}
wg.Wait()
fmt.Println("producer.Push.Status", "ok")
}
// 发送消息
func sendMessage(producer *nsq.Producer, topicName string, msg string) {
err = producer.Publish(topicName, []byte(msg))
if err != nil {
fmt.Println("producer.Publish", err)
return
}
fmt.Println("producer.Publish",msg, "√")
}
消费记录
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"io/ioutil"
"log"
"sync"
)
func main() {
testNSQ()
}
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
const (
TOPIC = "test"
CHANNEL_1 = "consumer_channel_1"
CHANNEL_2 = "consumer_channel_2"
URL = "127.0.0.1:4150"
)
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config := nsq.NewConfig()
config.MaxInFlight = 10
for i := 0; i < 10; i++ {
consumer, err := nsq.NewConsumer(TOPIC, CHANNEL_1, config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD(URL)
if nil != err {
fmt.Println("err", err)
return
}
fmt.Println(CHANNEL_1,i)
}
select {}
}()
waiter.Wait()
}
有疑问加站长微信联系(非本文作者)