nsq使用的是源码编译安装。
使用源码编译安装的时候建立先建一个单独的gopath防止与原有的程序混淆
使用的环境是ubuntu14
1、先安装git:apt-get install git
2、go环境与gopath设置直接跳过自行百度
3、安装godep:go get github.com/kr/godep
4、安装assert:go get github.com/bmizerany/assert
5、我是直接把nsq的源代码下载下来,gopath下:src/github.com/nsqio/nsq下
6、一般使用godep安装是不会成功的,需要把Godeps文件删除重新生成(这里感觉主要作用是一步一步来解决nsq需要的依赖包):rm Godeps
7、重新生成,会提示缺少包这时就需要用go get把相应的包下载来下来:godep save ./...
8、如果提示缺少golang.org/下的包需要去https://github.com/golang下找把下载下来放到github.com/golang下:git clone https://github.com/golang/sys.git
9、依赖都找好后,没有用godep安装,而是用:make&make install
10、会在源码的根目录下建立一个build,在这个目录下有服务命令,同时也自动安装下/usr/local/bin目录下
搭建nsq消息服务器:
nsqlookupd:主要用于管理nsqd与topic的信息。可以选择性启动,直接启动:nohup ./nsqlookupd >> nsqlookupd.log 2>&1 &
nsqd:消息处理的服务,用于接收转发消息。可以单独直接启动,一般都会通过-lookupd-tcp-address参数指定nsqlookupd来完成管理。-deflate参数指定是否放弃一些特定的消息默认是true,true时会丢失一些信息,一般手动指定为false。启动nsqd服务:nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 -deflate=false >> nsqd.log 2>&1 &
nsqadmin:服务web管理界面,需要指定lookupd;启动方式:nohup ./nsqadmin -lookupd-http-address=127.0.0.1:4161 >> nsqd.log 2>&1 &
nsq消息服务的默认信息:
nsqlookupd:
-http-address:监听的http协议端口,默认是:0.0.0.0:4161
-tcp-address:监听的tcp协议端口,默认是:0.0.0.0:4160
nsqd:
-deflate:是否放弃一些特定的消息默认是true,如果设置为false可以保证不丢失消息。如:只启动发送消息的程序时,会把消息存储起来,等接收消息的程序启动时会把这些消息发送到相应的Consumer
-data-path:数据文件存放位置
-http-address:消息服务的http监听端口,默认是:0.0.0.0:4151
-http-client-connect-timeout:设置http连接超时时长,默认2秒
-http-client-request-timeout:设置http请求超时时长,默认5秒
-https-address:设置nsqd监听的https地址,默认是:0.0.0.0:4152
-lookupd-tcp-address:指定管理nsqd的nsqlookupd的tcp监听地址
-tcp-address:指定消息服务器的tcp监听端口,默认是:0.0.0.0:4150
nsqadmin:
-http-address:指定管理nsqd web服务的地址与端口,默认是:0.0.0.0:4171
-http-client-connect-timeout:设置http连接超时时长,默认2秒
-http-client-request-timeout:设置http请求超时时长,默认5秒
-lookupd-http-address:指定nsqlookupd管理的地址
nsqd操作命令:
1、创建一个topic(post提交):http://172.16.10.79:4151/topic/create?topic=name
2、删除一个topic(post提交):http://172.16.10.79:4151/topic/delete?topic=name
3、在topic下创建一个channel(post提交):http://172.16.10.79:4151/channel/create?topic=name&channel=name
4、在topic下删除一个channel(post提交):http://172.16.10.79:4151/channel/delete?topic=name&channel=name
5、清空一个topic(post提交):http://172.16.10.79:4151/topic/empty?topic=name
6、在topic下清空一个channel(post提交):http://172.16.10.79:4151/channel/empty?topic=name&channel=name
nsqd的http命令使用curl有一个限制,如果有两个参数以上,&符号后的信息会识别不了,一般我用的是程序通过http的post请求来完成。如下:
str := "http://172.16.10.79:4151/channel/create?topic=dubing&channel=binge"
resp, err := http.Post(str,
"application/x-www-form-urlencoded",
nil)
if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// handle error
}
fmt.Println(string(body))
golang客户端来完成发送与接收消息(使用"github.com/nsqio/go-nsq"包)
func Producer() {
p, err := nsq.NewProducer("172.16.10.79:4150", nsq.NewConfig())
if err != nil {
fmt.Println(err)
panic(err)
}
for {
if err := p.Publish("test", []byte("hello NSQ!!!"+time.Now().String())); err != nil {
fmt.Println(err)
panic(err)
}
time.Sleep(time.Second * 1)
}
}
type ConsumerT struct{}
func (self *ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}
func Consumer() {
c, err := nsq.NewConsumer("test", "ch", nsq.NewConfig())
if err != nil {
fmt.Println(err)
panic(err)
}
//这里也可以直接使用函数
/*c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}))*/
c.AddHandler(&ConsumerT{})
if err := c.ConnectToNSQD("172.16.10.79:4150"); err != nil {
fmt.Println(err)
panic(err)
}
<-c.StopChan
}
func main() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go Consumer()
go Producer()
waiter.Wait()
}
有疑问加站长微信联系(非本文作者)