go-rabbmitmq-pool连接池channel复用自动重连

kelley · · 930 次点击 · 开始浏览    置顶

## rabbitmq pool 连接池channel复用 # 励志于完善golang生态 项目地址: [https://gitee.com/tym_hmm/rabbitmq-pool-go=>点击直达](https://gitee.com/tym_hmm/rabbitmq-pool-go) 开发语言 golang 依赖库 go get -u gitee.com/tym_hmm/rabbitmq-pool-go ``` go get -u gitee.com/tym_hmm/rabbitmq-pool-go ``` > 已在线上生产环镜运行, 5200W请求 qbs 3000 时, 连接池显示无压力<br> > rabbitmq部署为线上集群 ### 功能说明 1. 自定义连接池大小及最大处理channel数 2. 消费者底层断线自动重连 3. 底层使用轮循方式复用tcp 4. 生产者每个tcp对应一个channel,防止channel写入阻塞造成内存使用过量 5. 支持rabbitmq exchangeType 6. 默认交换机、队列、消息都会持久化磁盘 7. 默认值 | 名称 | 说明 | | --- | --- | | tcp最大连接数 | 5 | | 生产者消费发送失败最大重试次数 | 5 | | 消费者最大channel信道数(每个连接自动平分) | 100(每个tcp10个) | ### 使用 1. 初始化 ``` var oncePool sync.Once var instanceRPool *kelleyRabbimqPool.RabbitPool func initrabbitmq() *kelleyRabbimqPool.RabbitPool { oncePool.Do(func() { //初始化生产者 instanceRPool = kelleyRabbimqPool.NewProductPool() //初始化消费者 instanceConsumePool = kelleyRabbimqPool.NewConsumePool() //使用默认虚拟host "/" err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest") //使用自定义虚 //err:=instanceConsumePool.ConnectVirtualHost("192.168.1.202", 5672, "guest", "guest", "/testHost") if err != nil { fmt.Println(err) } }) return instanceRPool } ``` 2. 生产者 ``` var wg sync.WaitGroup for i:=0;i<100000; i++ { wg.Add(1) go func(num int) { defer wg.Done() data:=kelleyRabbimqPool.GetRabbitMqDataFormat("testChange5", kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC, "textQueue5", "/", fmt.Sprintf("这里是数据%d", num)) _=instanceRPool.Push(data) }(i) } wg.Wait() ``` 3. 消费者 > 可定义多个消息者事件, 不通交换机, 队列, 路由 > > 每个事件独立 > ``` nomrl := &rabbitmq.ConsumeReceive{ #定义消费者事件 ExchangeName: "testChange31",//队列名称 ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT, Route: "", QueueName: "testQueue31", IsTry:true,//是否重试 IsAutoAck: false, //是否自动确认消息 MaxReTry: 5,//最大重试次数 EventFail: func(code int, e error, data []byte) { fmt.Printf("error:%s", e) }, /*** * 参数说明 * @param data []byte 接收的rabbitmq数据 * @param header map[string]interface{} 原rabbitmq header * @param retryClient RabbitmqPool.RetryClientInterface 自定义重试数据接口,重试需return true 防止数据重复提交 ***/ EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {//如果返回true 则无需重试 _ = retryClient.Ack()//确认消息 fmt.Printf("data:%s\n", string(data)) return true }, } instanceConsumePool.RegisterConsumeReceive(nomrl) err := instanceConsumePool.RunConsume() if err != nil { fmt.Println(err) } ``` > * 参数说明 | 名称 | 类型 | 说明 | | --- | --- | --- | | ExchangeName| string | 交换机名称 | | ExchangeType | string | 交换机类型: <br>EXCHANGE_TYPE_FANOUT<br>EXCHANGE_TYPE_DIRECT<br>EXCHANGE_TYPE_TOPIC | | Route| string | 路由键 | | QueueName | string | 队列名称 | | IsTry | bool | 是否重试<br>如果开启重试后, 在成功回调用返回true会对消息进行重试, 重试时间为 5000~15000 MS| | IsAutoAck | bool | 是否自动确认消息, true: 组件底层会自动对消息进行确认<br> false: 手动进行消息确认,在成功会调中需进行手动确认` _ = retryClient.Ack()` | | MaxReTry | int | 重试最大次数s, 需isTry为true | | EventFail | func | 失败回调 | | EventSuccess | func | 成功回调 | 4 错误码说明 > 错误码为 > 1. 生产者push时返回的 *RabbitMqError > 2. 消费者事件监听回返的 code | 错误码 | 说明 | | --- | --- | |501|生产者发送超过最大重试次数| |502|获取信道失败, 一般为认道队列数用尽| |503|交换机/队列/绑定失败| |504|连接失败| |506|信道创建失败| |507|超过最大重试次数| [https://gitee.com/tym_hmm/rabbitmq-pool-go=>点击直达](https://gitee.com/tym_hmm/rabbitmq-pool-go) 其它组件(已在生产使用): #### 依赖组件 * [http框架=>https://gitee.com/tym_hmm/eric-http.git](https://gitee.com/tym_hmm/eric-http.git) * [http路由组件=>https://gitee.com/tym_hmm/go-web-router.git](https://gitee.com/tym_hmm/go-web-router.git) * [mysql-pool连接池组件=>https://gitee.com/tym_hmm/mysql_pool.git](https://gitee.com/tym_hmm/mysql_pool.git) * [redis-pool连接池组件=>https://gitee.com/tym_hmm/redis-pool.git](https://gitee.com/tym_hmm/redis-pool.git) * [mongodb-pool连接池=>https://gitee.com/tym_hmm/mgo-pool.git](https://gitee.com/tym_hmm/mgo-pool.git)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

930 次点击  ∙  2 赞  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传