假如在开发一个http接口,在并发调用传入相同参数时,需要加锁,一个一个执行。在并发调用传入不同参数时,需要并发处理,不能加锁,这时需要一个基于key的加锁。
于是我开发了一个Multilocker
类,用法为:使用l := NewMultilocker()
获取Multilocker
对象,使用mu := l.Get("key")获取sync.mutex,然后使用这个mutex加锁解锁,最后使用l.Put("key")
归还这个sync.mutex
如果同一时间并发调用Get
传入的为相同的key,则获取的是同一个sync.Mutex,如果同一时间并发调用Get
传入的为不同的key,则获取的是不同的sync.Mutex
multilock.go
package xsync
import (
"sync"
)
type mutexItem struct {
mu sync.Mutex
counter int64
}
// mutexPool is not concurrent safe
type mutexPool struct {
pool []*mutexItem
}
func (p *mutexPool) Put(mi *mutexItem) {
p.pool = append(p.pool, mi)
}
func (p *mutexPool) Get() *mutexItem {
if len(p.pool) == 0 {
return &mutexItem{}
}
i := len(p.pool) - 1
mi := p.pool[i]
p.pool = p.pool[:i]
return mi
}
type messageGet struct {
key string
c chan *sync.Mutex
}
type messagePut string
// Multilocker is key based multiple locker
type Multilocker struct {
gets chan messageGet
puts chan messagePut
inUse map[string]*mutexItem
done chan struct{}
mp mutexPool
cp sync.Pool
}
// Get get key related sync.Mutex
func (l *Multilocker) Get(key string) *sync.Mutex {
msg := messageGet{
key: key,
// c: l.cp.Get().(chan *sync.Mutex),
c: make(chan *sync.Mutex, 1),
}
l.gets <- msg
mu := <-msg.c
// l.cp.Put(msg.c)
return mu
}
// Put release key related mutexItem
func (l *Multilocker) Put(key string) {
l.puts <- messagePut(key)
}
// Close will stop schedule
func (l *Multilocker) Close() {
close(l.done)
}
func (l *Multilocker) handleGet(msg *messageGet) {
key := msg.key
c := msg.c
mi, ok := l.inUse[key]
if !ok {
mi = l.mp.Get()
l.inUse[key] = mi
}
mi.counter++
c <- &mi.mu
}
func (l *Multilocker) handlePut(msg *messagePut) {
key := string(*msg)
mi, ok := l.inUse[key]
if !ok {
panic("should call lock first")
}
mi.counter--
if mi.counter == 0 {
l.mp.Put(mi)
delete(l.inUse, key)
}
}
func (l *Multilocker) schedule() {
loop:
for {
select {
case msg := <-l.gets:
l.handleGet(&msg)
case msg := <-l.puts:
l.handlePut(&msg)
case <-l.done:
break loop
}
}
}
// NewMultilocker return a new Multilocker
func NewMultilocker() *Multilocker {
l := &Multilocker{
gets: make(chan messageGet, 1000),
puts: make(chan messagePut, 1000),
inUse: make(map[string]*mutexItem),
cp: sync.Pool{
New: func() interface{} { return make(chan *sync.Mutex, 1) },
},
}
go l.schedule()
return l
}
multilock_test.go
package xsync
import (
"log"
"strconv"
"sync"
"testing"
"time"
)
func TestSingleLock(t *testing.T) {
l := NewMultilocker()
mu := l.Get("3")
mu.Lock()
mu.Unlock()
l.Put("3")
}
func TestMultiple(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("start1")
defer log.Println("end")
l := NewMultilocker()
var n sync.WaitGroup
for i := 0; i < 2000; i++ {
n.Add(1)
i := i
key := strconv.Itoa(i % 2)
// key := strconv.Itoa(i)
go func() {
defer n.Done()
mu := l.Get(key)
mu.Lock()
// log.Printf("%d - %s start", i, key)
time.Sleep(time.Millisecond * 10)
// log.Printf("%d - %s end", i, key)
mu.Unlock()
l.Put(key)
}()
}
n.Wait()
}
有疑问加站长微信联系(非本文作者)

…………
一个sync.Map,值是sync.Mutex类型就可以了……
找了下我的代码,大概是这样的
想了下,你想要实现的功能和我不完全一样。
我的是第一次unlock后就释放整个key的锁。
你应该是以最后一个元素unlock为准。
那就再加个计数,在计数为0后再delete元素就可以了。
说到底就是一个带锁的locker的map或者干脆是sync.map的问题么……
第一次unlock后就释放整个key的锁: 有点类似cache,第一次取的时候缓存没有,需要加锁获取到缓存里,后续直接从缓存获取。可以参考The Go Programming Language的第9章的
9.7. Example: Concurrent Non-Blocking Cache
的实现。这个9.7最后也用了一个单独的channel来实现,因为这样性能比较好。如果使用共享变量,老是要加锁解锁的访问共享变量的内容,并且写的代码容易出bug。使用channel实现的话,代码可读性高,易于理解。
毕竟Go比较著名的谚语是:Don't communicate by sharing memory, share memory by communicating.
我这个场景是同一时间相同key,需要先后执行,不能同一时间执行,所以需要counter。
我先看下你的代码
看了下你这个代码,确实不能实现并发时同一个key加锁先后执行:
假设有10个并发同时调用
Locker("key1")
,然后其中一个执行完毕并调用l.Map.Delete(l.Key)
删除,剩下9个。 后面又来了10个并发调用Locker("key1")
,则后来的10个并发和前面的9个拥有的不是同一个锁,无法起到加锁的作用。所以需要counter,但是不用channel的话,这个真不好实现的,纸上谈兵没用...另外我觉得你这个代码性能不够好,开始有10个并发同时调用
Locker("key1")
,之后第一个执行完毕后,剩下9个应当立刻返回(例如使用close channel来广播,参考9.7最后的实现),但实际上你这个还要加锁解锁。chan 本身就是基于锁实现的……
基于chan来实现无锁是不太可能有性能优势的。
这个你不用channel的话,
counter++
counter--
if counter ==0
,等相关代码,没办法保证在并发情况下没有race的,我这里有锁啊……
benchmark_test.go
你这个代码有bug:
我举一个极端的例子:
假设有10个并发传入了相同的key,其中
goroutine_01
先进行count++
并获取了Locker
结构体另外9个卡在了
l := v.(*Locker)
这一行和下一行之间之后
goroutine_01
又进行count--
并从map中删除了Locker
结构体,然后另外9个goroutine调用的
Locker
函数返回,从而获取了与goroutine_01
相同的Locker
结构体之后又来了10个并发传入了相同key,由于map中已经没有这个key关联的
Locker
结构体,所以获取的是一个新的Locker
结构体也就是说后面的10个,和前面剩余的9个用的不是同一把锁,也就起不到加锁的作用
????
count++和count --都用同一个countlocker加锁。
你在说什么?
你想告诉我,goroutine_01加锁解锁countlocker,进行业务,再加锁结锁countlocker,这段时间内,其他的go都获取不到这个锁?
这个锁是goroutine_01专用的?
你是想说golang的sync.Mutex有bug?
然后你所说的这个bug,
在你完全没加锁的代码里,是怎么做到不发生的呢?
sync.Mutex没有bug,是你的代码的bug。
我举的是只是例子而已。实际上有可能是两个goroutine,其中一个count--后从map删除Locker,之后又来一个goroutine获取了新的不同的锁,起不到加锁的作用。
我的代码 count++ 和count-- 以及其他变量被限制在同一个goroutine里,所以不会有这种bug。
不能期望所有goroutine按照期望的顺序执行代码。
就像你期望goroutine_01 使用countlocker 加锁进行 count--之前,另外一个goroutine_02能获取到countlocker 并进行count++
但这种直觉是不正确的,也有可能是goroutine_01 又获取到了countlocker并进行count--
这也导致了bug的产生
看了遍你的代码,知道你要干什么了。
你要解决的不是并发的问题,是要解决唯一的问题。
那更简单,一个全局锁的问题。
其实你想要的可能是这样一个东西
这个版本看起来好像没是OK的,我研究研究......
看大佬们讨论真涨知识