消息队列由生产者和消费者共同协作,生产者产生消息放入队列中,消费者从队列中取出消息。
1.消费者 type Producer struct { id int64 addr string conn producerConn config Config logger logger logLvl LogLevel logGuard sync.RWMutex responseChan chan []byte errorChan chan []byte closeChan chan int transactionChan chan *ProducerTransaction transactions []*ProducerTransaction state int32 concurrentProducers int32 stopFlag int32 exitChan chan int wg sync.WaitGroup guard sync.Mutex } func TestProducerConnection(t *testing.T) { config := NewConfig() laddr := "127.0.0.1" config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0") w, _ := NewProducer("127.0.0.1:4150", config) w.SetLogger(nullLogger, LogLevelInfo) err := w.Publish("write_test", []byte("test")) if err != nil { t.Fatalf("should lazily connect - %s", err) } w.Stop() err = w.Publish("write_test", []byte("fail test")) if err != ErrStopped { t.Fatalf("should not be able to write after Stop()") } }
有疑问加站长微信联系(非本文作者)