浅析 go sync包
背景介绍
尽管 Golang 推荐通过 channel 进行通信和同步,但在实际开发中 sync 包用得也非常的多
var a = 0
// 启动 100 个协程,需要足够大
// var lock sync.Mutex
for i := 0; i < 100; i++ {
go func(idx int) {
// lock.Lock()
// defer lock.Unlock()
a += 1
fmt.Printf("goroutine %d, a=%d\n", idx, a)
}(i)
}
// 等待 1s 结束主程序
// 确保所有协程执行完
time.Sleep(time.Second)
互斥锁sync.Mutex,读写锁sync.RWMutex
锁的一些概念及使用方法,
整个包围绕 Locker 进行,这是一个 interface:
type Locker interface {
Lock()
Unlock()
}
互斥锁 Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
使用须知:
一个互斥锁只能同时被一个 goroutine 锁定,其它 goroutine 将阻塞直到互斥锁被解锁(重新争抢对互斥锁的锁定)
对一个未锁定的互斥锁解锁将会产生运行时错误。
读写锁 RWMutex
func (rw *RWMutex) Lock() //写锁定
func (rw *RWMutex) Unlock() //写解锁
func (rw *RWMutex) RLock() //读锁定
func (rw *RWMutex) RUnlock() //读解锁
使用须知:
- 当有一个 goroutine 获得写锁定,其它无论是读锁定还是写锁定都将阻塞直到写解锁;
- 当有一个 goroutine 获得读锁定,其它读锁定任然可以继续;
- 当有一个或任意多个读锁定,写锁定将等待所有读锁定解锁之后才能够进行写锁定。所以说这里的读锁定(RLock)目的其实是告诉写锁定:有很多人正在读取数据,你给我站一边去,等它们读(读解锁)完你再来写(写锁定)。
var count int
var rw sync.RWMutex
func main() {
ch := make(chan struct{}, 10)
for i := 0; i < 5; i++ {
go read(i, ch)
}
for i := 0; i < 5; i++ {
go write(i, ch)
}
for i := 0; i < 10; i++ {
<-ch
}
}
func read(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d 进入读操作...\n", n)
v := count
fmt.Printf("goroutine %d 读取结束,值为:%d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func write(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d 进入写操作...\n", n)
v := rand.Intn(1000)
count = v
fmt.Printf("goroutine %d 写入结束,新值为:%d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
sync.Waitgroup,sync.Once
WaitGroup
用于等待一组 goroutine 结束,用法很简单。它有三个方法:
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
说明: Add 用来添加 goroutine 的个数。Done 执行一次数量减 1。Wait 用来等待结束.
注意: wg.Add() 方法一定要在 goroutine 开始前执行哦。
var wg sync.WaitGroup
for i, s := range seconds {
// 计数加 1
wg.Add(1)
go func(i, s int) {
// 计数减 1
defer wg.Done()
fmt.Printf("goroutine%d 结束\n", i)
}(i, s)
}
// 等待执行结束
wg.Wait()
fmt.Println("所有 goroutine 执行结束")
Once
func (o *Once) Do(f func())
使用 sync.Once 对象可以使得函数多次调用只执行一次
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
----
# 打印结果
Only once
sync.Map
sync.Map是一个并发版本的Go语言的map
- 使用Store(interface {},interface {})添加元素。
- 使用Load(interface {}) interface {}检索元素。
- 使用Delete(interface {})删除元素。
- 使用LoadOrStore(interface {},interface {}) (interface {},bool)检索或添加之前不存在的元素。如果键之前在map中存在,则返回的布尔值为true。
- 使用Range遍历元素。
var m sync.Map
// m:=&sync.Map{}
// 添加元素
m.Store(1, "one")
m.Store(2, "two")
// 迭代所有元素
m.Range(func(key, value interface{}) bool {
fmt.Printf("%d: %s\n", key.(int), value.(string))
return true
})
// 获取元素1
value, ok := m.Load(1)
fmt.Println(value,ok) //one true
// 返回已存value,否则把指定的键值存储到map中
value, loaded := m.LoadOrStore(1, "three")
fmt.Println(value,loaded) //one true
value1, loaded1 := m.LoadOrStore(3, "three")
fmt.Println(value1,loaded1) //three false
m.Delete(3)
sync.Pool
在 golang 中有一个池pool,目的:
复用已经使用过的对象,来达到优化内存使用和回收的目的。
说白了,一开始这个池子会初始化一些对象供你使用,如果不够了呢,自己会通过new产生一些,当你放回去了之后这些对象会被别人进行复用,当对象特别大并且使用非常频繁的时候可以大大的减少对象的创建和回收的时间。
简单案例
一共只有三个方法我们需要知道的:New、Put、Get
var pool = sync.Pool{
New: func() interface{} {
return "123"
},
}
func main() {
t := pool.Get().(string)
fmt.Println(t)
pool.Put("321")
t2 := pool.Get().(string)
fmt.Println(t2)
}
---输出:
123
321
源码结构分析
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
其实结构并不复杂,但是如果自己看的话有点懵。注意几个细节就ok。
- local这里面真正的是[P]poolLocal其中P就是GPM模型中的P,有多少个P数组就有多大,也就是每个P维护了一个本地的poolLocal。
- poolLocal里面维护了一个private一个shared,看名字其实就很明显了,private是给自己用的,而shared的是一个队列,可以给别人用的。注释写的也很清楚,自己可以从队列的头部存然后从头部取,而别的P可以从尾部取。
- victim这个从字面上面也可以知道,幸存者嘛,当进行gc的stw时候,会将local中的对象移到victim中去,也就是说幸存了一次gc,
1. Get的逻辑其实非常清晰:
- 如果 private 不是空的,那就直接拿来用
- 如果 private 是空的,那就先去本地的shared队列里面从头 pop 一个
- 如果本地的 shared 也没有了,那 getSlow 去拿,其实就是去别的P的 shared 里面偷,偷不到回去 victim 幸存者里面找
- 如果最后都没有,那就只能调用 New 方法创建一个了
2. Put逻辑就很简单了:
- 如果 private 没有,就放在 private
- 如果 private 有了,那么就放到 shared 队列的头部
在看一个例子:
Put之后GC后Get
var pool = sync.Pool{
New: func() interface{} {
return "123"
},
}
func main() {
t := pool.Get().(string)
fmt.Println(t)
pool.Put("321")
pool.Put("321")
pool.Put("321")
pool.Put("321")
runtime.GC()
time.Sleep(1 * time.Second)
t2 := pool.Get().(string)
fmt.Println(t2)
runtime.GC()
time.Sleep(1 * time.Second)
t2 = pool.Get().(string)
fmt.Println(t2)
}
---输出:
123
321
123
思考:
- 什么情况下适合使用sync.Pool呢?
- sync.Pool的对象什么时候会被回收呢?
- sync.Pool是如何实现线程安全的?
sync.Cond
有疑问加站长微信联系(非本文作者)