假如在开发一个http接口,在并发调用传入相同参数时,需要加锁,一个一个执行。在并发调用传入不同参数时,需要并发处理,不能加锁,这时需要一个基于key的加锁。
于是我开发了一个`Multilocker`类,用法为:使用`l := NewMultilocker()`获取`Multilocker`对象,使用mu := l.Get("key")获取sync.mutex,然后使用这个mutex加锁解锁,最后使用`l.Put("key")`归还这个sync.mutex
如果同一时间并发调用`Get`传入的为相同的key,则获取的是同一个sync.Mutex,如果同一时间并发调用`Get`传入的为不同的key,则获取的是不同的sync.Mutex
multilock.go
```go
package xsync
import (
"sync"
)
type mutexItem struct {
mu sync.Mutex
counter int64
}
// mutexPool is not concurrent safe
type mutexPool struct {
pool []*mutexItem
}
func (p *mutexPool) Put(mi *mutexItem) {
p.pool = append(p.pool, mi)
}
func (p *mutexPool) Get() *mutexItem {
if len(p.pool) == 0 {
return &mutexItem{}
}
i := len(p.pool) - 1
mi := p.pool[i]
p.pool = p.pool[:i]
return mi
}
type messageGet struct {
key string
c chan *sync.Mutex
}
type messagePut string
// Multilocker is key based multiple locker
type Multilocker struct {
gets chan messageGet
puts chan messagePut
inUse map[string]*mutexItem
done chan struct{}
mp mutexPool
cp sync.Pool
}
// Get get key related sync.Mutex
func (l *Multilocker) Get(key string) *sync.Mutex {
msg := messageGet{
key: key,
// c: l.cp.Get().(chan *sync.Mutex),
c: make(chan *sync.Mutex, 1),
}
l.gets <- msg
mu := <-msg.c
// l.cp.Put(msg.c)
return mu
}
// Put release key related mutexItem
func (l *Multilocker) Put(key string) {
l.puts <- messagePut(key)
}
// Close will stop schedule
func (l *Multilocker) Close() {
close(l.done)
}
func (l *Multilocker) handleGet(msg *messageGet) {
key := msg.key
c := msg.c
mi, ok := l.inUse[key]
if !ok {
mi = l.mp.Get()
l.inUse[key] = mi
}
mi.counter++
c <- &mi.mu
}
func (l *Multilocker) handlePut(msg *messagePut) {
key := string(*msg)
mi, ok := l.inUse[key]
if !ok {
panic("should call lock first")
}
mi.counter--
if mi.counter == 0 {
l.mp.Put(mi)
delete(l.inUse, key)
}
}
func (l *Multilocker) schedule() {
loop:
for {
select {
case msg := <-l.gets:
l.handleGet(&msg)
case msg := <-l.puts:
l.handlePut(&msg)
case <-l.done:
break loop
}
}
}
// NewMultilocker return a new Multilocker
func NewMultilocker() *Multilocker {
l := &Multilocker{
gets: make(chan messageGet, 1000),
puts: make(chan messagePut, 1000),
inUse: make(map[string]*mutexItem),
cp: sync.Pool{
New: func() interface{} { return make(chan *sync.Mutex, 1) },
},
}
go l.schedule()
return l
}
```
multilock_test.go
```go
package xsync
import (
"log"
"strconv"
"sync"
"testing"
"time"
)
func TestSingleLock(t *testing.T) {
l := NewMultilocker()
mu := l.Get("3")
mu.Lock()
mu.Unlock()
l.Put("3")
}
func TestMultiple(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("start1")
defer log.Println("end")
l := NewMultilocker()
var n sync.WaitGroup
for i := 0; i < 2000; i++ {
n.Add(1)
i := i
key := strconv.Itoa(i % 2)
// key := strconv.Itoa(i)
go func() {
defer n.Done()
mu := l.Get(key)
mu.Lock()
// log.Printf("%d - %s start", i, key)
time.Sleep(time.Millisecond * 10)
// log.Printf("%d - %s end", i, key)
mu.Unlock()
l.Put(key)
}()
}
n.Wait()
}
```
你这个代码有bug:
我举一个极端的例子:
假设有10个并发传入了相同的key,其中`goroutine_01`先进行`count++`并获取了`Locker`结构体
另外9个卡在了`l := v.(*Locker)`这一行和下一行之间
之后`goroutine_01`又进行`count--`并从map中删除了`Locker`结构体,
然后另外9个goroutine调用的`Locker`函数返回,从而获取了与`goroutine_01`相同的`Locker`结构体
之后又来了10个并发传入了相同key,由于map中已经没有这个key关联的`Locker`结构体,所以获取的是一个新的`Locker`结构体
也就是说后面的10个,和前面剩余的9个用的不是同一把锁,也就起不到加锁的作用
#12
更多评论
找了下我的代码,大概是这样的
func NewUtil() *Util {
return &Util{
locks: &sync.Map{},
}
}
type Util struct {
locks *sync.Map
}
func (u *Util) Locker(key string) (*Locker, bool) {
newlocker := &Locker{
Map: u.locks,
Key: key,
}
v, ok := u.locks.LoadOrStore(key, newlocker)
return v.(*Locker), ok
}
type Locker struct {
sync.RWMutex
Map *sync.Map
Key string
}
func (l *Locker) Unlock() {
l.RWMutex.Unlock()
l.Map.Delete(l.Key)
}
#2