nsq消息队列部署以及使用

谁不曾年少轻狂过 · · 638 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

NSQ是一个实时的分布式消息平台。它的设计目标是为在多台计算机上运行的松散服务提供一个现代化的基础设施骨架。

NSQ是由3个进程组成的:

  • nsqd 是一个接收、排队、然后转发消息到客户端的进程。
  • nsqlookupd管理拓扑信息并提供最终一致性的发现服务。
  • nsqadmin用于实时查看集群的统计数据(并且执行各种各样的管理任务)。

1 源码部署

软件下载直接去官网:https://nsq.io/deployment/installing.html

cd /usr/local/nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &

访问 nsqadmin

img

2 docker部署

获取镜像

docker pull nsqio/nsq

启动容器

  • 运行lookupd
~docker run -d --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
450cbab82b8eb491d42bf105185c1022010b4d05e65a04f6c52ba15e1f5af06f
  • 获取docker host的IP地址
~ docker inspect -f '{{ .NetworkSettings.IPAddress }}' lookupd
172.17.0.2
  • 运行nsqd
# --broadcast-address=广播到虚拟机地址
~ docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.1 --lookupd-tcp-address=172.17.0.2:4160
3bc0901c8c485c351cfe31b0ef1a4fa32bf6bf148f0d74907afec6cbb1e4a034
  • 运行nsqadmin
~ docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin  --lookupd-http-address=172.17.0.2:4161
1d4cb219b862613d42bbc0f0bd7d08146f48a32d4e68abae2073cf28ed765bb0

注意:宿主机防火墙是否有拦截

  • 查看docker容器是否正常启动运行
~ docker ps -a 
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                            NAMES
1d4cb219b862        nsqio/nsq           "/nsqadmin --lookupd…"   3 minutes ago       Up 3 minutes        4150-4151/tcp, 4160-4161/tcp, 4170/tcp, 0.0.0.0:4171->4171/tcp   nsqadmin
3bc0901c8c48        nsqio/nsq           "/nsqd --broadcast-a…"   3 minutes ago       Up 3 minutes        4160-4161/tcp, 0.0.0.0:4150-4151->4150-4151/tcp, 4170-4171/tcp   nsqd
450cbab82b8e        nsqio/nsq           "/nsqlookupd"            4 minutes ago       Up 4 minutes        4150-4151/tcp, 4170-4171/tcp, 0.0.0.0:4160-4161->4160-4161/tcp   lookupd
  • 访问nsqadmin
img

3 docker-compose部署

version: '2'

services:

  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    networks:
      - nsq-network
    hostname: nsqlookupd
    ports:
      - "4161:4161"
      - "4160:4160"

  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    # command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 -broadcast-address=虚拟机地址
    depends_on:
      - nsqlookupd
    hostname: nsqd
    networks:
      - nsq-network
    ports:
      - "4151:4151"
      - "4150:4150"

  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    hostname: nsqadmin
    ports:
      - "4171:4171"
    networks:
      - nsq-network

networks:
  nsq-network:

配置检查

docker-compose config

启动 docker-compose

➜  nsq docker-compose up -d
Starting nsq_nsqlookupd_1_a12f31d6a776 ... done
Starting nsq_nsqd_1_1c0db410157f       ... done
Starting nsq_nsqadmin_1_8c94f3c4a1b7   ... done

访问nsqadmin

img

客户端支持的库

https://nsq.io/clients/client_libraries.html

img

golang客户端使用

发送消息

方式一

package main

import (
    "bytes"
    "fmt"
    "net/http"
)

func main() {

    httpclient := &http.Client{}
    data := `haha`

    endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", 4151, "pub", "test")
    req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer([]byte(data)))
    resp, err := httpclient.Do(req)
    if err != nil {
        fmt.Printf(err.Error())
        return
    }
    if resp.StatusCode != 200 {
        fmt.Printf("%s status code: %d", "pub", resp.StatusCode)
    }
    defer resp.Body.Close()

}

方式二:

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "io/ioutil"
    "log"
    "sync"
    "time"
)

var err error

// 推送消息
func main() {

    url := "127.0.0.1:4150"
    topicName := "test"
    config := nsq.NewConfig()

    // new
    producer, err := nsq.NewProducer(url, config)
    if err != nil {
        fmt.Println("nsq.NewProducer", err)
        return
    }
    fmt.Println("nsq.NewProducer", "√")
    defer producer.Stop()

    producer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
    //  ping
    err = producer.Ping()
    if err != nil {
        fmt.Println("producer.Ping", err)
        return
    }
    fmt.Println("producer.Ping", "√")

    msgCt:=1000
    wg := &sync.WaitGroup{}
    wg.Add(msgCt)
    // 测试10 次
    for i := 0; i < msgCt; i++ {

        // 消息内容
        msg :=  time.Now().Format("0102150405")
        sendMessage(producer, topicName, msg)
        wg.Done()

        time.Sleep(10*time.Millisecond)
        // time.Sleep(1 * time.Second)
    }

    wg.Wait()
    fmt.Println("producer.Push.Status", "ok")
}

// 发送消息
func sendMessage(producer *nsq.Producer, topicName string, msg string) {

    err = producer.Publish(topicName, []byte(msg))
    if err != nil {
        fmt.Println("producer.Publish", err)
        return
    }
    fmt.Println("producer.Publish",msg, "√")

}

消费记录

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "io/ioutil"
    "log"
    "sync"
)


func main() {
    testNSQ()
}

type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

const (
    TOPIC   = "test"
    CHANNEL_1 = "consumer_channel_1"
    CHANNEL_2 = "consumer_channel_2"
    URL     = "127.0.0.1:4150"
)

func testNSQ() {

    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()

        config := nsq.NewConfig()
        config.MaxInFlight = 10

        for i := 0; i < 10; i++ {
            consumer, err := nsq.NewConsumer(TOPIC, CHANNEL_1, config)
            if nil != err {
                fmt.Println("err", err)
                return
            }
            consumer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelInfo)
            consumer.AddHandler(&NSQHandler{})
            err = consumer.ConnectToNSQD(URL)
            if nil != err {
                fmt.Println("err", err)
                return
            }

            fmt.Println(CHANNEL_1,i)
        }
        select {}
    }()


    waiter.Wait()
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:谁不曾年少轻狂过

查看原文:nsq消息队列部署以及使用

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

638 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传