Go Redigo 源码分析(三) 执行命令

大二小的宝 · · 683 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

简单使用

简单使用Do函数获取单条和使用童丹请求多条获取多条数据。

func main() {
    // 1. 创建连接池
    // 2. 简单设置连接池的最大链接数等参数
    // 3. 注入拨号函数
    // 4. 调用pool.Get() 获取连接
    pool := &redis.Pool{
        MaxIdle:   4,
        MaxActive: 4,
        Dial: func() (redis.Conn, error) {
            rc, err := redis.Dial("tcp", "127.0.0.1:6379")
            if err != nil {
                return nil, err
            }
            return rc, nil
        },
        IdleTimeout: time.Second,
        Wait:        true,
    }
    con := pool.Get()
    // 获取单条
    str, err := redis.String(con.Do("get", "aaa"))
    fmt.Println(str, err)
    // 通道 发送多条接受多条
    con.Send("get", "aaa")
    con.Send("get", "bbb")
    con.Send("get", "ccc")
    con.Flush()
    str, err = redis.String(con.Receive())
    fmt.Println("value: ", str, " err:", err)
    str, err = redis.String(con.Receive())
    fmt.Println("value: ", str, " err:", err)
    str, err = redis.String(con.Receive())
    fmt.Println("value: ", str, " err:", err)
    con.Close()
}

源码查看

上一篇看了Get方法获取连接池中的链接,获取到连接之后调用Do函数请求redis服务获取回复。现在我们就需要看Do函数的源码
1. Conn接口 在rediso中有两个对象都实现了这个接口

type Conn interface {
    // Close closes the connection.
    Close() error

    // Err returns a non-nil value when the connection is not usable.
    Err() error

    // Do sends a command to the server and returns the received reply.
    Do(commandName string, args ...interface{}) (reply interface{}, err error)

    // Send writes the command to the client's output buffer.
    Send(commandName string, args ...interface{}) error

    // Flush flushes the output buffer to the Redis server.
    Flush() error

    // Receive receives a single reply from the Redis server
    Receive() (reply interface{}, err error)
}

// 连接池对外的连接对象
type activeConn struct {
    p     *Pool
    pc    *poolConn
    state int
}

// 连接对象 
type conn struct {
    //  锁
    mu      sync.Mutex
    pending int
    err     error
    // http 包中的conn对象
    conn    net.Conn

    // 读入过期时间
    readTimeout time.Duration
    // bufio reader对象 用于读取redis服务返回的结果
    br          *bufio.Reader

    // 写入过期时间
    writeTimeout time.Duration
    // bufio writer对象 带buf 用于往服务端写命令
    bw           *bufio.Writer

    // Scratch space for formatting argument length.
    // '*' or '$', length, "\r\n"
    lenScratch [32]byte

    // Scratch space for formatting integers and floats.
    numScratch [40]byte
}

Do 函数
Do函数最终调用的是conn对象的DoWithTimeout函数
DoWithTimeout函数负责将请求的命令发送到redis服务 再从redis服务读取回复
writeCommand函数是写入命令函数
readReply函数是读取函数

// active conn Do函数 设定请求状态用于关闭时候退出命令
func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
    pc := ac.pc
    if pc == nil {
        return nil, errConnClosed
    }
    // 查看是否需要改变状态
    ci := lookupCommandInfo(commandName)
    ac.state = (ac.state | ci.Set) &^ ci.Clear
    return pc.c.Do(commandName, args...)
}

func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
    return c.DoWithTimeout(c.readTimeout, cmd, args...)
}

// conn 执行命令函数
func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
    c.mu.Lock()
    pending := c.pending
    c.pending = 0
    c.mu.Unlock()

    if cmd == "" && pending == 0 {
        return nil, nil
    }
    // 设置下入超时时间
    if c.writeTimeout != 0 {
        c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
    }

    // 如果cmd不为空则写入redis命令
    if cmd != "" {
        // 写入命令道buf中
        if err := c.writeCommand(cmd, args); err != nil {
            return nil, c.fatal(err)
        }
    }
    // 把写入的buf 的command 写入conn中 正式发送到服务器
    if err := c.bw.Flush(); err != nil {
        return nil, c.fatal(err)
    }

    var deadline time.Time
    if readTimeout != 0 {
        deadline = time.Now().Add(readTimeout)
    }
    c.conn.SetReadDeadline(deadline)

    if cmd == "" {
        reply := make([]interface{}, pending)
        for i := range reply {
            r, e := c.readReply()
            if e != nil {
                return nil, c.fatal(e)
            }
            reply[i] = r
        }
        return reply, nil
    }

    var err error
    var reply interface{}
    for i := 0; i <= pending; i++ {
        var e error
        if reply, e = c.readReply(); e != nil {
            return nil, c.fatal(e)
        }
        if e, ok := reply.(Error); ok && err == nil {
            err = e
        }
    }
    return reply, err
}

// 把command写入到conn的write中
// 1. 先写入*号
// 2. 再写入command
// 3. 最后写入参数
func (c *conn) writeCommand(cmd string, args []interface{}) error {
    c.writeLen('*', 1+len(args))
    if err := c.writeString(cmd); err != nil {
        return err
    }
    for _, arg := range args {
        if err := c.writeArg(arg, true); err != nil {
            return err
        }
    }
    return nil
}

// 读取redis回复 通过判断回复雷星星 + - : $来解析
func (c *conn) readReply() (interface{}, error) {
    line, err := c.readLine()
    if err != nil {
        return nil, err
    }
    if len(line) == 0 {
        return nil, protocolError("short response line")
    }
    switch line[0] {
    // 回复状态
    case '+':
        switch {
        case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
            // Avoid allocation for frequent "+OK" response.
            return okReply, nil
        case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
            // Avoid allocation in PING command benchmarks :)
            return pongReply, nil
        default:
            return string(line[1:]), nil
        }
    // 错误回复
    case '-':
        return Error(string(line[1:])), nil
    // 整数回复
    case ':':
        return parseInt(line[1:])
    // 批量回复
    case '$':
        n, err := parseLen(line[1:])
        if n < 0 || err != nil {
            return nil, err
        }
        p := make([]byte, n)
        _, err = io.ReadFull(c.br, p)
        if err != nil {
            return nil, err
        }
        if line, err := c.readLine(); err != nil {
            return nil, err
        } else if len(line) != 0 {
            return nil, protocolError("bad bulk string format")
        }
        return p, nil
        // 多条批量回复
    case '*':
        n, err := parseLen(line[1:])
        if n < 0 || err != nil {
            return nil, err
        }
        r := make([]interface{}, n)
        for i := range r {
            r[i], err = c.readReply()
            if err != nil {
                return nil, err
            }
        }
        return r, nil
    }
    return nil, protocolError("unexpected response line")
}

总结

为了方便些注释,Fork一份加上了一些注释希望对于理解有帮助:连接地址
其实我们可以看到Redigo其实就是帮助我们把命令发送到redis服务中兵获取redis回复。总体的流程还是比较简单的。


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:大二小的宝

查看原文:Go Redigo 源码分析(三) 执行命令

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

683 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传