在上篇用纯go在本机模拟了下分布式队列的东西。这里补上Redis队列部份。
用Redis做队列有下面三个问题需解决:
1. 队列构建
使用Redis的RPUSH/LPOP来解决
2. 参数传递/解析
客户端将JOSN参数存入Redis,Server端取出后解析还原。
3. 连接池
用Redis做队列有下面三个问题需解决:
1. 队列构建
使用Redis的RPUSH/LPOP来解决
2. 参数传递/解析
客户端将JOSN参数存入Redis,Server端取出后解析还原。
3. 连接池
redigo支持Redis连接池
下面代码就是具体解决实现:
//Redis做后台任务队列 //author: Xiong Chuan Liang //date: 2015-3-25 package main import ( "bytes" "encoding/json" "errors" "fmt" "time" "github.com/garyburd/redigo/redis" ) func main() { r, err := newRedisPool("", "") if err != nil { fmt.Println(err) return } //将job放入队列 r.Enqueue() //依次取出两个Job r.GetJob() r.GetJob() } type RedisPool struct { pool *redis.Pool } func newRedisPool(server, password string) (*RedisPool, error) { if server == "" { server = ":6379" } pool := &redis.Pool{ MaxIdle: 3, IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", server) if err != nil { return nil, err } if password != "" { if _, err := c.Do("AUTH", password); err != nil { c.Close() return nil, err } } return c, err }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, } return &RedisPool{pool}, nil } type Job struct { Class string `json:"Class"` Args []interface{} `json:"Args"` } //模拟客户端 func (r *RedisPool) Enqueue() error { c := r.pool.Get() defer c.Close() j := &Job{} j.Class = "mail" j.Args = append(j.Args, "xcl_168@aliyun.com", "", "body", 2, true) j2 := &Job{} j2.Class = "Log" j2.Args = append(j2.Args, "ccc.log", "ddd.log", []int{222, 333}) for _, v := range []*Job{j, j2} { b, err := json.Marshal(v) if err != nil { return err } _, err = c.Do("rpush", "queue", b) if err != nil { return err } } fmt.Println("[Enqueue()] succeed!") return nil } //模拟Job Server func (r *RedisPool) GetJob() error { count, err := r.QueuedJobCount() if err != nil || count == 0 { return errors.New("暂无Job.") } fmt.Println("[GetJob()] Jobs count:", count) c := r.pool.Get() defer c.Close() for i := 0; i < int(count); i++ { reply, err := c.Do("LPOP", "queue") if err != nil { return err } var j Job decoder := json.NewDecoder(bytes.NewReader(reply.([]byte))) if err := decoder.Decode(&j); err != nil { return err } fmt.Println("[GetJob()] ", j.Class, " : ", j.Args) } return nil } func (r *RedisPool) QueuedJobCount() (int, error) { c := r.pool.Get() defer c.Close() lenqueue, err := c.Do("llen", "queue") if err != nil { return 0, err } count, ok := lenqueue.(int64) if !ok { return 0, errors.New("类型转换错误!") } return int(count), nil } /* 运行结果: [Enqueue()] succeed! [GetJob()] Jobs count: 2 [GetJob()] mail : [xcl_168@aliyun.com body 2 true] [GetJob()] Log : [ccc.log ddd.log [222 333]] [root@xclos src]# */可以看到Go已能取得参数。都是些最基础的东西。
MAIL: xcl_168@aliyun.com
BLOG: http://blog.csdn.net
有疑问加站长微信联系(非本文作者)