golang 关于rabbmitmq 的简易连接池

ccoding · · 1103 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

说明:

网上找了一圈好像没有rabbmitmq连接池的维护比较好的包,索性按照整理出来的自己写了一个简易版的,希望各位大神继续完善,目前只是实现了连接的一部分,具体tabbmitmq 的操作 按照 amqp的说明操作即可

这里有大部分的ampq的实例

https://blog.csdn.net/lastsweetop/article/details/91038836 

/***************************************************
 * @Time : 2019/11/21 6:46 下午
 * @Author : ccoding
 * @File : rabbmitmq
 * @Software: GoLand
 **************************************************/
package rabbitmqPool

import (
	"errors"
	"github.com/streadway/amqp"
	"sync"
	"time"
)

var (
	ErrInvalidConfig = errors.New("invalid pool config")
	ErrPoolClosed    = errors.New("pool closed")
)

type PoolConfig struct {
	MaxOpen     int           // 池中最大资源数
	NumOpen     int           // 当前池中资源数
	MinOpen     int           // 池中最少资源数
	Closed      bool          // 池是否已关闭
	IdleTimeout time.Duration //空闲连接连接超时时间
	WaitTimeOut time.Duration //等待获取连接超时时间
}
type NewConnection func() (*amqp.Connection, error)
type RabbitmqPool struct {
	mu    sync.Mutex
	conns chan *amqp.Connection

	newConnection func() (*amqp.Connection, error)
	poolConfig    *PoolConfig
}

func NewPool(config *PoolConfig, newConnection NewConnection) (*RabbitmqPool, error) {
	if config.MaxOpen <= 0 || config.MinOpen > config.MaxOpen {
		return nil, ErrInvalidConfig
	}
	p := &RabbitmqPool{
		conns:         make(chan *amqp.Connection, config.MaxOpen),
		newConnection: newConnection,
		poolConfig:    config,
	}
	for i := 0; i < config.MinOpen; i++ {
		conn, err := newConnection()
		if err != nil {
			continue
		}
		config.NumOpen++
		p.conns <- conn
	}
	return p, nil
}

func (p *RabbitmqPool) Get() (*amqp.Connection, error) {
	if p.poolConfig.Closed {
		return nil, ErrPoolClosed
	}
	for {
		conn, err := p.connection()
		if err != nil {
			return nil, err
		}
		// todo maxLifttime处理
		return conn, nil
	}
}

func (p *RabbitmqPool) connection() (*amqp.Connection, error) {
	select {
	case conn := <-p.conns:
		return conn, nil
	default:
		p.mu.Lock()
		if p.poolConfig.NumOpen >= p.poolConfig.MaxOpen {
			conn := <-p.conns
			p.mu.Unlock()
			return conn, nil
		}
		// 新建连接
		conn, err := p.newConnection()
		if err != nil {
			p.mu.Unlock()
			return nil, err
		}
		p.poolConfig.NumOpen++
		p.mu.Unlock()
		return conn, nil
	}
}

// 释放单个资源到连接池
func (p *RabbitmqPool) Release(conn *amqp.Connection) error {
	if p.poolConfig.Closed {
		return ErrPoolClosed
	}
	p.mu.Lock()
	p.conns <- conn
	p.mu.Unlock()
	return nil
}

// 关闭单个资源
func (p *RabbitmqPool) Close(conn *amqp.Connection) error {
	p.mu.Lock()
	conn.Close()
	p.poolConfig.NumOpen--
	p.mu.Unlock()
	return nil
}

// 关闭连接池,释放所有资源
func (p *RabbitmqPool) ClosePool() error {
	if p.poolConfig.Closed {
		return ErrPoolClosed
	}
	p.mu.Lock()
	close(p.conns)
	for conn := range p.conns {
		conn.Close()
		p.poolConfig.NumOpen--
	}
	p.poolConfig.Closed = true
	p.mu.Unlock()
	return nil
}
//打开通道
func  OpenChannel (conn *amqp.Connection) (*amqp.Channel,error){
	 ch,err := conn.Channel()
	 return ch,err
}

 


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1103 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传