首先需要澄清一个事实:redis
服务端是单线程处理客户端请求,也就是说客户端请求在服务端是串行化执行的,因此对服务端来说,并不存在并发问题。但业务方却存在并发操作redis
中的同一个key
的情况。所以如何让A
客户端知道B
客户端正在操作它想操作的 key
,就成了必须要讨论的问题。
那么开始总结下方案吧:
1. SETNX key value //key存在就不做任何操作,返回0;不存在操作成功返回1
这种方式通过对需要操作的key
加锁来保证并发操作的串行化。这里我们以Golang
代码为例来举例说明该操作。先看多个协程写同一个key
的情况。代码如下:
package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
"runtime"
"sync"
"time"
)
var w sync.WaitGroup
func newRdsPool(server, auth string) *redis.Pool {
return &redis.Pool{
MaxIdle: 100,
MaxActive: 30,
IdleTimeout: 60 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if auth == "" {
return c, err
}
if _, err := c.Do("AUTH", auth); err != nil {
c.Close()
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func g1(r redis.Conn) {
for i := 0; i < 2; i++ {
if _, err := redis.String(r.Do("set", "hello", "1")); err != nil {
fmt.Println(err)
}
time.Sleep(10 * time.Millisecond)
}
w.Done()
}
func g2(r redis.Conn) {
for i := 0; i < 2; i++ {
if _, err := redis.String(r.Do("set", "hello", "2")); err != nil {
fmt.Println(err)
}
time.Sleep(10 * time.Millisecond)
}
w.Done()
}
func main() {
w.Add(2)
runtime.GOMAXPROCS(runtime.NumCPU())
var rc1 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
var rc2 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
defer rc1.Close()
defer rc2.Close()
go g1(rc1)
go g2(rc2)
w.Wait()
}
执行上面的代码之后,hello
的值在1和2之间徘徊。希望出现的是如果协程1在操作时候,协程2就放弃操作,也即让操作串行化。这样就需要有一个锁来保证不能同时让两个协程进去临界区。setnx = set if not exists
不存在返回1,存在返回0。通过这个机制可以判断当前的lock
是否已经被设置了。lock
必须给一个过期时间,因为很有可能goroutine1
在do work
的时候出现panic
,这样就导致goroutine2
一直在尝试获取锁。
package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
"runtime"
"sync"
"time"
)
var w sync.WaitGroup
func newRdsPool(server, auth string) *redis.Pool {
return &redis.Pool{
MaxIdle: 100,
MaxActive: 30,
IdleTimeout: 60 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if auth == "" {
return c, err
}
if _, err := c.Do("AUTH", auth); err != nil {
c.Close()
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func g1(r redis.Conn) {
var lock int64
var lock_timeout int64 = 2
var lock_time int64
var now int64
for lock != 1 {
now = time.Now().Unix()
lock_time = now + lock_timeout
lock, err1 := redis.Int64(r.Do("setnx", "foo", lock_time))
lockValue1, err2 := redis.Int64(r.Do("get", "foo"))
if lock == 1 && err1 == nil {
break
} else {
if now > lockValue1 && err2 == nil {
lockValue2, err3 := redis.Int64(r.Do("getset", "foo", lock_time))
if err3 == nil && now > lockValue2 {
break
} else {
fmt.Println(`g1 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
} else {
fmt.Println(`g1 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
}
}
for i := 0; i < 5; i++ {
if _, err := redis.String(r.Do("set", "hello", "1")); err != nil {
fmt.Println(err)
}
fmt.Println(`g1 now work... `)
time.Sleep(1 * time.Second)
}
if time.Now().Unix() < lock_time {
if _, err4 := redis.Int64(r.Do("del", "foo")); err4 != nil {
fmt.Println(err4)
}
}
w.Done()
}
func g2(r redis.Conn) {
var lock int64
var lock_timeout int64 = 2
var lock_time int64
var now int64
for lock != 1 {
now = time.Now().Unix()
lock_time = now + lock_timeout
lock, err1 := redis.Int64(r.Do("setnx", "foo", lock_time))
lockValue1, err2 := redis.Int64(r.Do("get", "foo"))
if lock == 1 && err1 == nil {
break
} else {
if now > lockValue1 && err2 == nil {
lockValue2, err3 := redis.Int64(r.Do("getset", "foo", lock_time))
if err3 == nil && now > lockValue2 {
break
} else {
fmt.Println(`g2 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
} else {
fmt.Println(`g2 not get lock`)
time.Sleep(1000 * time.Millisecond)
}
}
}
for i := 0; i < 5; i++ {
if _, err := redis.String(r.Do("set", "hello", "2")); err != nil {
fmt.Println(err)
}
fmt.Println(`g2 now work... `)
time.Sleep(1 * time.Second)
}
if time.Now().Unix() < lock_time {
if _, err4 := redis.Int64(r.Do("del", "foo")); err4 != nil {
fmt.Println(err4)
}
}
w.Done()
}
func main() {
w.Add(2)
runtime.GOMAXPROCS(runtime.NumCPU())
var rc1 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
var rc2 redis.Conn = newRdsPool(`127.0.0.1:6379`, ``).Get()
defer rc1.Close()
defer rc2.Close()
go g1(rc1)
go g2(rc2)
w.Wait()
}
上面的代码给出了两个goroutine
通过锁达到串行化操作同一个key
的效果。
2. MULTI、DISCARD、 EXEC、WATCH // redis事务
3. 将对key的操作的值都放到一个list里面
有疑问加站长微信联系(非本文作者)