看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的
认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责从lookupd中获取到
指定nsqd之后,从nsqd中获取消息。
生产者
我们以nsq/apps/to_nsq/to_nsq.go为例,客户端这边的代码逻辑就简单很多,NewProducer实例化一个instance,publish消息
到nsqd。
/// nsq/apps/to_nsq/to_nsq.go
producer, err := nsq.NewProducer(addr, cfg)
err := producer.Publish(*topic, line)
下面来看下Publish里的具体逻辑。
// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
// 生成具体的cmd
return w.sendCommand(Publish(topic, body))
}
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction)
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
t := <-doneChan
return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
args []interface{}) error {
// keep track of how many outstanding producers we're dealing with
// in order to later ensure that we clean them all up...
atomic.AddInt32(&w.concurrentProducers, 1)
defer atomic.AddInt32(&w.concurrentProducers, -1)
if atomic.LoadInt32(&w.state) != StateConnected {
// 这里是一个lazily connect
err := w.connect()
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t:
case <-w.exitChan:
return ErrStopped
}
return nil
}
在connect函数里启动了一个go routine去处理transactionChan对应的东西
func (w *Producer) connect() error {
w.closeChan = make(chan int)
w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
_, err := w.conn.Connect()
w.wg.Add(1)
go w.router()
这里需要注意一下, go-nsq/conn.go是对底层连接的一个抽象,他是不关心你是生产者还是消费者,这里使用到了
delegate 模式,conn.go收到消息的处理放到了producerConnDelegate和consumerConnDelegate中,然后通知到具体的
消费者活着生产者。
消费者
回过头我们再来看下消费者部分的代码,client端我们以nsq/apps/nsq_tail/nsq_tail.go为例,代码的基本逻辑如下:
// 1. new comsunmer instanace
consumer, err := nsq.NewConsumer(topics[i], *channel, cfg)
// 2. add handler
consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages})
// 3. connect to nsqd
consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
log.Fatal(err)
}
// 4. connect to lookupd
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
log.Fatal(err)
}
consumers = append(consumers, consumer)
下面来看下每个部分的实际代码:
func (r *Consumer) AddHandler(handler Handler) {
r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
if atomic.LoadInt32(&r.connectedFlag) == 1 {
panic("already connected")
}
atomic.AddInt32(&r.runningHandlers, int32(concurrency))
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
}
}
至此handler添加完成,起一个单独的go routine来等待消息的到了。
func (r *Consumer) handlerLoop(handler Handler) {
r.log(LogLevelDebug, "starting Handler")
for {
message, ok := <-r.incomingMessages // 有新的消息的到来
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message) // 调用之前注册的handler
if err != nil {
r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
if !message.IsAutoResponseDisabled() {
message.Requeue(-1)
}
continue
}
if !message.IsAutoResponseDisabled() {
message.Finish()
}
}
exit:
r.log(LogLevelDebug, "stopping Handler")
if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
r.exit()
}
}
官方是不推荐只部署nqd而不部署lookupd的,我们直接看下lookup的连接过程:
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
...
r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
numLookupd := len(r.lookupdHTTPAddrs)
r.mtx.Unlock()
// if this is the first one, kick off the go loop
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
return nil
}
在queryLookupd中先去查询lookupd获取最新的nqd地址,然后connect to nsqd.
func (r *Consumer) lookupdLoop() {
// add some jitter so that multiple consumers discovering the same topic,
// when restarted at the same time, dont all connect at once.
ticker = time.NewTicker(r.config.LookupdPollInterval)
// 每个ticker interval更新nqd的地址信息
for {
select {
case <-ticker.C:
r.queryLookupd()
case <-r.lookupdRecheckChan:
r.queryLookupd()
case <-r.exitChan:
goto exit
}
}
}
func (r *Consumer) ConnectToNSQD(addr string) error {
// 1. new connection
conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
conn.SetLogger(logger, logLvl,
fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))
// 2. connection list
_, pendingOk := r.pendingConnections[addr]
_, ok := r.connections[addr]
r.pendingConnections[addr] = conn
if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
}
r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
// 3. new connect
// 3.1 go c.readLoop()
// 3.2 go c.writeLoop()
resp, err := conn.Connect()
// 4. sub to nsqd
cmd := Subscribe(r.topic, r.channel)
err = conn.WriteCommand(cmd)
}
以上就是客户端初始化的一个流程,然后就是接受消息处理了。
->NewConsumer() // 新建一个consumer
->ConnectToNSQLookupds() // 连接到lookupd
|->ConnectToNSQLookupd() // 连接到lookupd
|->r.queryLookupd() // 查询lookupd的
|->apiRequestNegotiateV1() // 调用lookupd的rest api获取nsqd消息
|->ConnectToNSQD() // 连接到具体nsq
|->NewConn() // 连接instance
|->conn.Connect() // 开始连接
|->c.readLoop() // 与nqd连接read loop
|->c.writeLoop() // 与nqd连接write loop
|->Subscribe() // consumer发送SUB command
|->lookupdLoop() // 定时查询lookupd并更新nsqd信息
注:
[1]. 关于delegate模式参考 这里
有疑问加站长微信联系(非本文作者)