开发了一个基于key的加锁方案

kzh125 · 2020-06-16 11:45:40 · 4926 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2020-06-16 11:45:40 的主题,其中的信息可能已经有所发展或是发生改变。

假如在开发一个http接口,在并发调用传入相同参数时,需要加锁,一个一个执行。在并发调用传入不同参数时,需要并发处理,不能加锁,这时需要一个基于key的加锁。

于是我开发了一个Multilocker类,用法为:使用l := NewMultilocker()获取Multilocker对象,使用mu := l.Get("key")获取sync.mutex,然后使用这个mutex加锁解锁,最后使用l.Put("key")归还这个sync.mutex

如果同一时间并发调用Get传入的为相同的key,则获取的是同一个sync.Mutex,如果同一时间并发调用Get传入的为不同的key,则获取的是不同的sync.Mutex

multilock.go

package xsync

import (
    "sync"
)

type mutexItem struct {
    mu      sync.Mutex
    counter int64
}

// mutexPool is not concurrent safe
type mutexPool struct {
    pool []*mutexItem
}

func (p *mutexPool) Put(mi *mutexItem) {
    p.pool = append(p.pool, mi)
}

func (p *mutexPool) Get() *mutexItem {
    if len(p.pool) == 0 {
        return &mutexItem{}
    }
    i := len(p.pool) - 1
    mi := p.pool[i]
    p.pool = p.pool[:i]
    return mi
}

type messageGet struct {
    key string
    c   chan *sync.Mutex
}

type messagePut string

// Multilocker is key based multiple locker
type Multilocker struct {
    gets  chan messageGet
    puts  chan messagePut
    inUse map[string]*mutexItem
    done  chan struct{}
    mp    mutexPool
    cp    sync.Pool
}

// Get get key related sync.Mutex
func (l *Multilocker) Get(key string) *sync.Mutex {
    msg := messageGet{
        key: key,
        // c:   l.cp.Get().(chan *sync.Mutex),
        c: make(chan *sync.Mutex, 1),
    }
    l.gets <- msg
    mu := <-msg.c
    // l.cp.Put(msg.c)
    return mu
}

// Put release key related mutexItem
func (l *Multilocker) Put(key string) {
    l.puts <- messagePut(key)
}

// Close will stop schedule
func (l *Multilocker) Close() {
    close(l.done)
}

func (l *Multilocker) handleGet(msg *messageGet) {
    key := msg.key
    c := msg.c
    mi, ok := l.inUse[key]
    if !ok {
        mi = l.mp.Get()
        l.inUse[key] = mi
    }
    mi.counter++
    c <- &mi.mu
}

func (l *Multilocker) handlePut(msg *messagePut) {
    key := string(*msg)
    mi, ok := l.inUse[key]
    if !ok {
        panic("should call lock first")
    }
    mi.counter--
    if mi.counter == 0 {
        l.mp.Put(mi)
        delete(l.inUse, key)
    }
}

func (l *Multilocker) schedule() {
loop:
    for {
        select {
        case msg := <-l.gets:
            l.handleGet(&msg)
        case msg := <-l.puts:
            l.handlePut(&msg)
        case <-l.done:
            break loop
        }
    }
}

// NewMultilocker return a new Multilocker
func NewMultilocker() *Multilocker {
    l := &Multilocker{
        gets:  make(chan messageGet, 1000),
        puts:  make(chan messagePut, 1000),
        inUse: make(map[string]*mutexItem),
        cp: sync.Pool{
            New: func() interface{} { return make(chan *sync.Mutex, 1) },
        },
    }
    go l.schedule()
    return l
}

multilock_test.go

package xsync

import (
    "log"
    "strconv"
    "sync"
    "testing"
    "time"
)

func TestSingleLock(t *testing.T) {
    l := NewMultilocker()
    mu := l.Get("3")
    mu.Lock()
    mu.Unlock()
    l.Put("3")
}

func TestMultiple(t *testing.T) {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("start1")
    defer log.Println("end")

    l := NewMultilocker()
    var n sync.WaitGroup
    for i := 0; i < 2000; i++ {
        n.Add(1)
        i := i
        key := strconv.Itoa(i % 2)
        // key := strconv.Itoa(i)
        go func() {
            defer n.Done()
            mu := l.Get(key)
            mu.Lock()
            // log.Printf("%d - %s start", i, key)
            time.Sleep(time.Millisecond * 10)
            // log.Printf("%d - %s end", i, key)
            mu.Unlock()
            l.Put(key)
        }()
    }
    n.Wait()
}

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

4926 次点击  ∙  1 赞  
加入收藏 微博
21 回复  |  直到 2021-03-17 14:42:42
jarlyyn
jarlyyn · #1 · 5年之前

…………

一个sync.Map,值是sync.Mutex类型就可以了……

jarlyyn
jarlyyn · #2 · 5年之前

找了下我的代码,大概是这样的

func NewUtil() *Util {
    return &Util{
        locks: &sync.Map{},
    }
}

type Util struct {
    locks     *sync.Map
}

func (u *Util) Locker(key string) (*Locker, bool) {
       newlocker := &Locker{   
            Map: u.locks,  
            Key: key,
        }
    v, ok := u.locks.LoadOrStore(key, newlocker)
    return v.(*Locker), ok
}

type Locker struct {
        sync.RWMutex
        Map *sync.Map
        Key string
}

func (l *Locker) Unlock() {
    l.RWMutex.Unlock()
    l.Map.Delete(l.Key)
}
jarlyyn
jarlyyn · #3 · 5年之前

想了下,你想要实现的功能和我不完全一样。

我的是第一次unlock后就释放整个key的锁。

你应该是以最后一个元素unlock为准。

那就再加个计数,在计数为0后再delete元素就可以了。

说到底就是一个带锁的locker的map或者干脆是sync.map的问题么……

kzh125
kzh125 · #4 · 5年之前

第一次unlock后就释放整个key的锁: 有点类似cache,第一次取的时候缓存没有,需要加锁获取到缓存里,后续直接从缓存获取。可以参考The Go Programming Language的第9章的9.7. Example: Concurrent Non-Blocking Cache 的实现。

这个9.7最后也用了一个单独的channel来实现,因为这样性能比较好。如果使用共享变量,老是要加锁解锁的访问共享变量的内容,并且写的代码容易出bug。使用channel实现的话,代码可读性高,易于理解。

毕竟Go比较著名的谚语是:Don't communicate by sharing memory, share memory by communicating.

我这个场景是同一时间相同key,需要先后执行,不能同一时间执行,所以需要counter。

我先看下你的代码

kzh125
kzh125 · #5 · 5年之前

看了下你这个代码,确实不能实现并发时同一个key加锁先后执行:

假设有10个并发同时调用Locker("key1"),然后其中一个执行完毕并调用l.Map.Delete(l.Key)删除,剩下9个。 后面又来了10个并发调用Locker("key1"),则后来的10个并发和前面的9个拥有的不是同一个锁,无法起到加锁的作用。所以需要counter,但是不用channel的话,这个真不好实现的,纸上谈兵没用...

另外我觉得你这个代码性能不够好,开始有10个并发同时调用Locker("key1"),之后第一个执行完毕后,剩下9个应当立刻返回(例如使用close channel来广播,参考9.7最后的实现),但实际上你这个还要加锁解锁。

jarlyyn
jarlyyn · #6 · 5年之前
kzh125kzh125 #5 回复

看了下你这个代码,确实不能实现并发时同一个key加锁先后执行: 假设有10个并发同时调用`Locker("key1")`,然后其中一个执行完毕并调用`l.Map.Delete(l.Key)`删除,剩下9个。 后面又来了10个并发调用`Locker("key1")`,则后来的10个并发和前面的9个拥有的不是同一个锁,无法起到加锁的作用。所以需要counter,但是不用channel的话,这个真不好实现的,纸上谈兵没用... 另外我觉得你这个代码性能不够好,开始有10个并发同时调用`Locker("key1")`,之后第一个执行完毕后,剩下9个应当立刻返回(例如使用close channel来广播,参考9.7最后的实现),但实际上你这个还要加锁解锁。

chan 本身就是基于锁实现的……

基于chan来实现无锁是不太可能有性能优势的。

kzh125
kzh125 · #7 · 5年之前

那就再加个计数,在计数为0后再delete元素就可以了

这个你不用channel的话, counter++ counter-- if counter ==0,等相关代码,没办法保证在并发情况下没有race的,

jarlyyn
jarlyyn · #8 · 5年之前
kzh125kzh125 #7 回复

> *那就再加个计数,在计数为0后再delete元素就可以了* 这个你不用channel的话, `counter++` `counter--` `if counter ==0`,等相关代码,没办法保证在并发情况下没有race的,

我这里有锁啊……

jarlyyn
jarlyyn · #9 · 5年之前
package xsync

import "sync"

func NewUtil() *Util {
    return &Util{
        locks: &sync.Map{},
    }
}

type Util struct {
    locks *sync.Map
}

func (u *Util) Locker(key string) (*Locker, bool) {
    newlocker := &Locker{
        Map: u.locks,
        Key: key,
    }
    v, ok := u.locks.LoadOrStore(key, newlocker)
    l := v.(*Locker)
    l.countlocker.Lock()
    l.count++
    l.countlocker.Unlock()
    return l, ok
}

type Locker struct {
    sync.RWMutex
    Map         *sync.Map
    count       int
    countlocker sync.Mutex
    Key         string
}

func (l *Locker) Unlock() {
    l.RWMutex.Unlock()
}

func (l *Locker) Lock() {
    l.RWMutex.Lock()
}

func (l *Locker) Close() {
    l.countlocker.Lock()
    l.count--
    if l.count == 0 {
        l.Map.Delete(l.Key)
    }

    l.countlocker.Unlock()
}
jarlyyn
jarlyyn · #10 · 5年之前

benchmark_test.go

package xsync

import (
    "testing"
)

func BenchmarkUtil(b *testing.B) {
    var i = 0
    u := NewUtil()
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            locker, _ := u.Locker("test")
            locker.Lock()
            i++
            locker.Unlock()
            locker.Close()
        }
    })
}

func BenchmarkXsync(b *testing.B) {
    var i = 0
    l := NewMultilocker()
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu := l.Get("test")
            mu.Lock()
            i++
            mu.Unlock()
            l.Put("test")
        }
    })
}
jarlyyn
jarlyyn · #11 · 5年之前

Running tool: /home/jarlyyn/goroot/go1.13/bin/go test -benchmem -run=^$ -bench .

goos: linux
goarch: amd64
BenchmarkUtil-12          8520894           136 ns/op          87 B/op           2 allocs/op
BenchmarkXsync-12         1460154           834 ns/op         104 B/op           2 allocs/op
PASS
ok      _/tmp/test    3.364s
kzh125
kzh125 · #12 · 5年之前

你这个代码有bug:

我举一个极端的例子:

假设有10个并发传入了相同的key,其中goroutine_01先进行count++并获取了Locker结构体

另外9个卡在了l := v.(*Locker)这一行和下一行之间

之后goroutine_01又进行count--并从map中删除了Locker结构体,

然后另外9个goroutine调用的Locker函数返回,从而获取了与goroutine_01相同的Locker结构体

之后又来了10个并发传入了相同key,由于map中已经没有这个key关联的Locker结构体,所以获取的是一个新的Locker结构体

也就是说后面的10个,和前面剩余的9个用的不是同一把锁,也就起不到加锁的作用

jarlyyn
jarlyyn · #13 · 5年之前
kzh125kzh125 #12 回复

你这个代码有bug: 我举一个极端的例子: 假设有10个并发传入了相同的key,其中`goroutine_01`先进行`count++`并获取了`Locker`结构体 另外9个卡在了`l := v.(*Locker)`这一行和下一行之间 之后`goroutine_01`又进行`count--`并从map中删除了`Locker`结构体, 然后另外9个goroutine调用的`Locker`函数返回,从而获取了与`goroutine_01`相同的`Locker`结构体 之后又来了10个并发传入了相同key,由于map中已经没有这个key关联的`Locker`结构体,所以获取的是一个新的`Locker`结构体 也就是说后面的10个,和前面剩余的9个用的不是同一把锁,也就起不到加锁的作用

????

count++和count --都用同一个countlocker加锁。

你在说什么?

你想告诉我,goroutine_01加锁解锁countlocker,进行业务,再加锁结锁countlocker,这段时间内,其他的go都获取不到这个锁?

这个锁是goroutine_01专用的?

你是想说golang的sync.Mutex有bug?

jarlyyn
jarlyyn · #14 · 5年之前
kzh125kzh125 #12 回复

你这个代码有bug: 我举一个极端的例子: 假设有10个并发传入了相同的key,其中`goroutine_01`先进行`count++`并获取了`Locker`结构体 另外9个卡在了`l := v.(*Locker)`这一行和下一行之间 之后`goroutine_01`又进行`count--`并从map中删除了`Locker`结构体, 然后另外9个goroutine调用的`Locker`函数返回,从而获取了与`goroutine_01`相同的`Locker`结构体 之后又来了10个并发传入了相同key,由于map中已经没有这个key关联的`Locker`结构体,所以获取的是一个新的`Locker`结构体 也就是说后面的10个,和前面剩余的9个用的不是同一把锁,也就起不到加锁的作用

然后你所说的这个bug,

在你完全没加锁的代码里,是怎么做到不发生的呢?

kzh125
kzh125 · #15 · 5年之前

sync.Mutex没有bug,是你的代码的bug。

我举的是只是例子而已。实际上有可能是两个goroutine,其中一个count--后从map删除Locker,之后又来一个goroutine获取了新的不同的锁,起不到加锁的作用。

我的代码 count++ 和count-- 以及其他变量被限制在同一个goroutine里,所以不会有这种bug。

kzh125
kzh125 · #16 · 5年之前

不能期望所有goroutine按照期望的顺序执行代码。

就像你期望goroutine_01 使用countlocker 加锁进行 count--之前,另外一个goroutine_02能获取到countlocker 并进行count++

但这种直觉是不正确的,也有可能是goroutine_01 又获取到了countlocker并进行count--

这也导致了bug的产生

jarlyyn
jarlyyn · #17 · 5年之前
kzh125kzh125 #16 回复

不能期望所有goroutine按照期望的顺序执行代码。 就像你期望goroutine_01 使用countlocker 加锁进行 count--之前,另外一个goroutine_02能获取到countlocker 并进行count++ 但这种直觉是不正确的,也有可能是goroutine_01 又获取到了countlocker并进行count-- 这也导致了bug的产生

看了遍你的代码,知道你要干什么了。

你要解决的不是并发的问题,是要解决唯一的问题。

那更简单,一个全局锁的问题。

package xsync

import "sync"

func NewUtil() *Util {
    return &Util{
        locks: map[interface{}]*Locker{},
    }
}

type Util struct {
    locker sync.Mutex
    locks  map[interface{}]*Locker
}

func (u *Util) Locker(key interface{}) (*Locker, bool) {
    u.locker.Lock()
    l, ok := u.locks[key]
    if !ok {
        l = &Locker{
            Util: u,
            Key:  key,
        }
        u.locks[key] = l
    }
    u.locker.Unlock()
    return l, ok
}

type Locker struct {
    sync.RWMutex
    Util  *Util
    count int
    Key   interface{}
}

func (l *Locker) Close() {
    l.Util.locker.Lock()
    l.count--
    if l.count == 0 {
        delete(l.Util.locks, l.Key)
    }

    l.Util.locker.Unlock()
}
jarlyyn
jarlyyn · #18 · 5年之前
Running tool: /home/jarlyyn/goroot/go1.13/bin/go test -benchmem -run=^$ -bench .

goos: linux
goarch: amd64
BenchmarkUtil-12          5250324           217 ns/op           0 B/op           0 allocs/op
BenchmarkXsync-12         1479282           848 ns/op         104 B/op           2 allocs/op
PASS
ok      _/tmp/test    3.459s
jarlyyn
jarlyyn · #19 · 5年之前
kzh125kzh125 #16 回复

不能期望所有goroutine按照期望的顺序执行代码。 就像你期望goroutine_01 使用countlocker 加锁进行 count--之前,另外一个goroutine_02能获取到countlocker 并进行count++ 但这种直觉是不正确的,也有可能是goroutine_01 又获取到了countlocker并进行count-- 这也导致了bug的产生

其实你想要的可能是这样一个东西

package xsync

import "sync"

type lockerPerKey struct {
    sync.Mutex
    count int
}
type XLocker struct {
    locker sync.Mutex
    locks  map[interface{}]*lockerPerKey
}

func NewXLocker() *XLocker {
    return &XLocker{
        locks: map[interface{}]*lockerPerKey{},
    }
}
func (l *XLocker) Lock(key interface{}) {
    l.locker.Lock()
    locker := l.locks[key]
    if locker == nil {
        locker = &lockerPerKey{}
        l.locks[key] = locker
    }
    locker.count++
    l.locker.Unlock()
    locker.Lock()
}

func (l *XLocker) Unlock(key interface{}) {
    l.locker.Lock()
    locker := l.locks[key]
    locker.Unlock()
    locker.count--
    if locker.count == 0 {
        delete(l.locks, key)
    }
    l.locker.Unlock()
}
package xsync

import (
    "testing"
)

func BenchmarkUtil(b *testing.B) {
    var i = 0
    u := NewUtil()
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            locker, _ := u.Locker("test")
            locker.Lock()
            i++
            locker.Unlock()
            locker.Close()
        }
    })
}

func BenchmarkXsync(b *testing.B) {
    var i = 0
    l := NewMultilocker()
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu := l.Get("test")
            mu.Lock()
            i++
            mu.Unlock()
            l.Put("test")
        }
    })
}

func BenchmarkXLocker(b *testing.B) {
    var i = 0
    u := NewXLocker()
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            u.Lock("test")
            i++
            u.Unlock("test")
        }
    })
}
Running tool: /home/jarlyyn/goroot/go1.13/bin/go test -benchmem -run=^$ -bench .

goos: linux
goarch: amd64
BenchmarkUtil-12            4816842           215 ns/op           0 B/op           0 allocs/op
BenchmarkXsync-12           1501810           821 ns/op         104 B/op           2 allocs/op
BenchmarkXLocker-12        13804106            91.2 ns/op           0 B/op           0 allocs/op
PASS
ok      _/tmp/test    4.686s
kzh125
kzh125 · #20 · 5年之前
jarlyynjarlyyn #19 回复

#16楼 @kzh125 其实你想要的可能是这样一个东西 ```go package xsync import "sync" type lockerPerKey struct { sync.Mutex count int } type XLocker struct { locker sync.Mutex locks map[interface{}]*lockerPerKey } func NewXLocker() *XLocker { return &XLocker{ locks: map[interface{}]*lockerPerKey{}, } } func (l *XLocker) Lock(key interface{}) { l.locker.Lock() locker := l.locks[key] if locker == nil { locker = &lockerPerKey{} l.locks[key] = locker } locker.count++ l.locker.Unlock() locker.Lock() } func (l *XLocker) Unlock(key interface{}) { l.locker.Lock() locker := l.locks[key] locker.Unlock() locker.count-- if locker.count == 0 { delete(l.locks, key) } l.locker.Unlock() } ``` ``` package xsync import ( "testing" ) func BenchmarkUtil(b *testing.B) { var i = 0 u := NewUtil() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { locker, _ := u.Locker("test") locker.Lock() i++ locker.Unlock() locker.Close() } }) } func BenchmarkXsync(b *testing.B) { var i = 0 l := NewMultilocker() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { mu := l.Get("test") mu.Lock() i++ mu.Unlock() l.Put("test") } }) } func BenchmarkXLocker(b *testing.B) { var i = 0 u := NewXLocker() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { u.Lock("test") i++ u.Unlock("test") } }) } ``` ``` Running tool: /home/jarlyyn/goroot/go1.13/bin/go test -benchmem -run=^$ -bench . goos: linux goarch: amd64 BenchmarkUtil-12 4816842 215 ns/op 0 B/op 0 allocs/op BenchmarkXsync-12 1501810 821 ns/op 104 B/op 2 allocs/op BenchmarkXLocker-12 13804106 91.2 ns/op 0 B/op 0 allocs/op PASS ok _/tmp/test 4.686s ```

这个版本看起来好像没是OK的,我研究研究......

liux95
liux95 · #21 · 4年之前

看大佬们讨论真涨知识

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传