简单使用
简单使用Do函数获取单条和使用童丹请求多条获取多条数据。
func main() {
// 1. 创建连接池
// 2. 简单设置连接池的最大链接数等参数
// 3. 注入拨号函数
// 4. 调用pool.Get() 获取连接
pool := &redis.Pool{
MaxIdle: 4,
MaxActive: 4,
Dial: func() (redis.Conn, error) {
rc, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return nil, err
}
return rc, nil
},
IdleTimeout: time.Second,
Wait: true,
}
con := pool.Get()
// 获取单条
str, err := redis.String(con.Do("get", "aaa"))
fmt.Println(str, err)
// 通道 发送多条接受多条
con.Send("get", "aaa")
con.Send("get", "bbb")
con.Send("get", "ccc")
con.Flush()
str, err = redis.String(con.Receive())
fmt.Println("value: ", str, " err:", err)
str, err = redis.String(con.Receive())
fmt.Println("value: ", str, " err:", err)
str, err = redis.String(con.Receive())
fmt.Println("value: ", str, " err:", err)
con.Close()
}
源码查看
上一篇看了Get方法获取连接池中的链接,获取到连接之后调用Do函数请求redis服务获取回复。现在我们就需要看Do函数的源码
1. Conn接口 在rediso中有两个对象都实现了这个接口
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value when the connection is not usable.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}
// 连接池对外的连接对象
type activeConn struct {
p *Pool
pc *poolConn
state int
}
// 连接对象
type conn struct {
// 锁
mu sync.Mutex
pending int
err error
// http 包中的conn对象
conn net.Conn
// 读入过期时间
readTimeout time.Duration
// bufio reader对象 用于读取redis服务返回的结果
br *bufio.Reader
// 写入过期时间
writeTimeout time.Duration
// bufio writer对象 带buf 用于往服务端写命令
bw *bufio.Writer
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
}
Do 函数
Do函数最终调用的是conn对象的DoWithTimeout函数
DoWithTimeout函数负责将请求的命令发送到redis服务 再从redis服务读取回复
writeCommand函数是写入命令函数
readReply函数是读取函数
// active conn Do函数 设定请求状态用于关闭时候退出命令
func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
// 查看是否需要改变状态
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Do(commandName, args...)
}
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
return c.DoWithTimeout(c.readTimeout, cmd, args...)
}
// conn 执行命令函数
func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
c.mu.Lock()
pending := c.pending
c.pending = 0
c.mu.Unlock()
if cmd == "" && pending == 0 {
return nil, nil
}
// 设置下入超时时间
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
// 如果cmd不为空则写入redis命令
if cmd != "" {
// 写入命令道buf中
if err := c.writeCommand(cmd, args); err != nil {
return nil, c.fatal(err)
}
}
// 把写入的buf 的command 写入conn中 正式发送到服务器
if err := c.bw.Flush(); err != nil {
return nil, c.fatal(err)
}
var deadline time.Time
if readTimeout != 0 {
deadline = time.Now().Add(readTimeout)
}
c.conn.SetReadDeadline(deadline)
if cmd == "" {
reply := make([]interface{}, pending)
for i := range reply {
r, e := c.readReply()
if e != nil {
return nil, c.fatal(e)
}
reply[i] = r
}
return reply, nil
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
if reply, e = c.readReply(); e != nil {
return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {
err = e
}
}
return reply, err
}
// 把command写入到conn的write中
// 1. 先写入*号
// 2. 再写入command
// 3. 最后写入参数
func (c *conn) writeCommand(cmd string, args []interface{}) error {
c.writeLen('*', 1+len(args))
if err := c.writeString(cmd); err != nil {
return err
}
for _, arg := range args {
if err := c.writeArg(arg, true); err != nil {
return err
}
}
return nil
}
// 读取redis回复 通过判断回复雷星星 + - : $来解析
func (c *conn) readReply() (interface{}, error) {
line, err := c.readLine()
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, protocolError("short response line")
}
switch line[0] {
// 回复状态
case '+':
switch {
case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
// 错误回复
case '-':
return Error(string(line[1:])), nil
// 整数回复
case ':':
return parseInt(line[1:])
// 批量回复
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
p := make([]byte, n)
_, err = io.ReadFull(c.br, p)
if err != nil {
return nil, err
}
if line, err := c.readLine(); err != nil {
return nil, err
} else if len(line) != 0 {
return nil, protocolError("bad bulk string format")
}
return p, nil
// 多条批量回复
case '*':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
r := make([]interface{}, n)
for i := range r {
r[i], err = c.readReply()
if err != nil {
return nil, err
}
}
return r, nil
}
return nil, protocolError("unexpected response line")
}
总结
为了方便些注释,Fork一份加上了一些注释希望对于理解有帮助:连接地址
其实我们可以看到Redigo其实就是帮助我们把命令发送到redis服务中兵获取redis回复。总体的流程还是比较简单的。
有疑问加站长微信联系(非本文作者)