用Redis实现分布式锁 与 实现任务队列

langzhiwu · 2018-06-27 17:27:15 · 1353 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2018-06-27 17:27:15 的主题,其中的信息可能已经有所发展或是发生改变。

大家都知道在天猫、京东、苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并发量,来抢这个手机,在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力,严重时说不定就宕机了,另一个问题是,秒杀的东西都是有量的,例如一款手机只有10台的量秒杀,那么,在高并发的情况下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个时候的先后顺序是很乱的,很容易出现10台的量,抢到的人就不止10个这种严重的问题。那么,以后所说的问题我们该如何去解决呢? 接下来我所分享的技术就可以拿来处理以上的问题: 分布式锁 和 任务队列。

代码实现: /**

  • 加锁
  • @param string name 锁的标识名
  • @param int timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待
  • @param int expire 当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放
  • @param int waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒)
  • @return bool true成功 false失败 */

func (rlock *RedisLock) Lock(name string, timeout int, expire int, waitIntervalUs int) bool { if name == "" { return false }

//获取当前时间
nowtime := time.Now().Unix()
//获取锁失败时的等待超时时刻
timeoutAt := nowtime + int64(timeout)
//锁的最大生存时刻
expireAt := nowtime + int64(expire)
//redis存放时的key
redisKey := "kx:" + name

for {
    //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放
    result, err := rlock.Redis_conn.Do("SETNX", redisKey, expire)
    if err != nil {
        return false
    }

    if result != 0 {
        //设置key的失效时间
        rlock.Redis_conn.Do("EXPIRE", redisKey, expire)
        //将锁标志放到lockedNames数组里
        rlock.LockedNames[name] = expireAt
        return true
    }

    //以秒为单位,返回给定key的剩余生存时间
    ttl, _ := redis.Int64(rlock.Redis_conn.Do("TTL", redisKey))
    //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
    //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
    //这时可以直接设置expire并把锁纳为己用
    if ttl < 0 {
        rlock.Redis_conn.Do("EXPIRE", redisKey, expire)
        rlock.LockedNames[name] = expireAt
        return true
    }

    /*****循环请求锁部分*****/
    //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出
    if timeout <= 0 || timeoutAt < time.Now().UnixNano()/1e6 {
        break
    }
    time.Sleep(time.Duration(waitIntervalUs))
}
return false

}

/**

  • 解锁
  • @param string name 锁的标识名 / func (rlock RedisLock) Unlock(name string) bool { //先判断是否存在此锁 if _, ok := rlock.LockedNames[name]; ok {
      //删除锁
      redisKey := "kx:" + name
      _, err := rlock.Redis_conn.Do("DEL", redisKey)
      if err != nil {
          return false
      }
      //清掉lockedNames里的锁标志
      delete(rlock.LockedNames, name)
      return true
    
    } return false }

完整代码https://github.com/langzhiwu/rdslock

新手学习,欢迎拍砖!


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1353 次点击  ∙  1 赞  
加入收藏 微博
5 回复  |  直到 2018-06-28 18:31:41
buscoop
buscoop · #1 · 7年之前

拍砖star

marlonche
marlonche · #2 · 7年之前
//ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建)
//如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用
//这时可以直接设置expire并把锁纳为己用
if ttl < 0 {
    rlock.Redis_conn.Do("EXPIRE", redisKey, expire)
    rlock.LockedNames[name] = expireAt
    return true
}

这种情况下可能会有多个进程同时执行到这个条件里面,导致这些进程都获取到了锁

if timeout <= 0 || timeoutAt < time.Now().UnixNano()/1e6 {
    break
}
time.Sleep(time.Duration(waitIntervalUs))

time.Now().UnixNano()/1e6的单位是ms,而timeoutAt单位是秒; time.Duration(waitIntervalUs)是把waitIntervalUs当纳秒用了

langzhiwu
langzhiwu · #3 · 7年之前
marlonchemarlonche #2 回复

//ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建) //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用 //这时可以直接设置expire并把锁纳为己用 if ttl < 0 { rlock.Redis_conn.Do("EXPIRE", redisKey, expire) rlock.LockedNames[name] = expireAt return true } 这种情况下可能会有多个进程同时执行到这个条件里面,导致这些进程都获取到了锁 if timeout <= 0 || timeoutAt < time.Now().UnixNano()/1e6 { break } time.Sleep(time.Duration(waitIntervalUs)) `time.Now().UnixNano()/1e6`的单位是ms,而timeoutAt单位是秒; `time.Duration(waitIntervalUs)`是把waitIntervalUs当纳秒用了

这个问题使用GETSET处理的,在github已处理。感谢反馈! 微信图片_20180628115713.png

marlonche
marlonche · #4 · 7年之前

在下面这种情况下可能会有问题:

A获取了锁,在expire之前A没有unlock,锁expire,B获取到了锁,A这时unlock,C获取到锁,这时B和C同时拥有锁

我觉得应该在每次获取锁时存一个uuid到redisKey:

result, err := rlock.Redis_conn.Do("SETNX", redisKey, uuid)

在unlock时如下处理:

func (rlock *RedisLock) Unlock(name string, uuid string) bool {
    redisKey := "kx:" + name
    if exist, _ := redis.Int(rlock.Redis_conn.Do("EXPIRE", redisKey, expire)); 0 == exist {
        //redisKey已经expire
        return true
    }
    //仍被本进程持有,或已经被其它进程持有
    if _uuid, _ := redis.String(rlock.Redis_conn.Do("GET", redisKey)); _uuid != uuid {
        //已经被其它进程持有
        return true
    }
    _, err := rlock.Redis_conn.Do("DEL", redisKey)
    if err != nil {
            return false
    }
}

LockedNames这个map似乎多余,并且在同一进程的多个goroutine使用同一lock的情况下这个map在按下面的顺序执行时会有问题:

  1. goroutine A 执行_, err := rlock.Redis_conn.Do("DEL", redisKey)

  2. goroutine B 执行rlock.Redis_conn.Do("SETNX", redisKey, expireAt)成功,然后执行rlock.LockedNames[name] = expireAt

  3. goroutine A 执行delete(rlock.LockedNames, name)

  4. goroutine B 执行Unlock时if _, ok := rlock.LockedNames[name]; ok条件不ok,rlock.Redis_conn.Do("DEL", redisKey)没执行

marlonche
marlonche · #5 · 7年之前

或者在Unlock时可以用lua脚本将下面的判断做成原子操作:

if _uuid, _ := redis.String(rlock.Redis_conn.Do("GET", redisKey)); _uuid == uuid {
    rlock.Redis_conn.Do("DEL", redisKey)
}
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传