singleflight.Group用途
这么说吧,singleflight.Group
是大并发下面的一把利器。一个请求如果访问1s,同样的请求来了500路,如果你的机器只能支持100路,处理完请求要5s。如果用上这个库,同样的机器,只要1s,是不是很犀利。
原因:singleflight.Group会缓存那一瞬间的并发请求(相同key值),这和普通的缓存还不一样。
一个简单的demo
这个例子使用var group singleflight.Group
缓存https://http2.golang.org/reqinfo
网站并发访问结果。 第一个go程成功访问https://http2.golang.org/reqinfo
网站内容,后面排队中的并发go程不会真的访问,都是从缓存里面拿结果。
注意: 要体会singleflight.Group和普通缓存的区别,可以把下面的go func(id int){}
,使用go程
起回调函数和去除go程起有什么区别,你会发现go程起的函数会缓存,不用go程起的函数没有缓存,请仔细体会这点。
- 声明,
var group singleflight.Group
- 使用,使用回调函数传参
res, err, _ := group.Do("reqinfo", func() (interface{}, error) {/*填写自己的回调函数内容*/}
group.Do第一个参数是缓存key的值,第二个参数传递需要缓存结果的回调函数
package main
import (
"fmt"
"github.com/guonaihong/gout"
"golang.org/x/sync/singleflight"
"sync"
)
func main() {
var group singleflight.Group
var wg sync.WaitGroup
wg.Add(10)
defer wg.Wait()
for i := 0; i < 10; i++ {
go func(id int) {
defer wg.Done()
res, err, _ := group.Do("reqinfo", func() (interface{}, error) {
s := ""
err := gout.GET("https://http2.golang.org/reqinfo").Debug(true).BindBody(&s).Do()
if err != nil {
return nil, err
}
return s, nil
})
if err != nil {
fmt.Printf("fail:%s\n", err)
}
fmt.Printf("id(%d) ------>%s\n", id, res.(string))
}(i)
}
}
深入了解数据结构call + Group + Result
-
type call struct
:重点聊下-
wg sync.WaitGroup
调用Do接口时,取缓存的go程会等待有机会执行doCall的go程,这是一种go程同步的手段,方法还有很多,比如:close(chan),新建context.Context调用下cancel函数等等。 -
val interface{}
存放结果 -
err error
存放error -
forgotten bool
Forget接口主动删除会设置为true -
dups int
瞬间读取缓存并发数 -
chans []chan<- Result
存放调用DoChan接口
但是没有机会执行doCall函数的go程,这里保存它们的chan,取完结果,就可以循环发给它们。
-
-
type Group struct
:很常见的key/val设计,map加mutex,没什么好聊的 -
type Result struct
:没什么好看的,保存缓存值Val interface{}
,错误Err error
,以及这个值是否共享Shared bool
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
forgotten bool
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
Do接口
Do干的事情就是有缓存取缓存的值,没有就真访问一次,细节往下看。
- 先是常规3连发,加锁,初始化map。
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
- 取缓存值,
c.wg.Wait()
这里是等待真取结果go程结束。c.dups
更新引用计数,g.mu.Unlock()解锁
,这两个没什么疑问。
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
- 设置缓存
-
c.wg.Add(1)
,有机会执行doCall的go程,表明自己还在干活中 -
g.m[key] = c
,构造一个*call放到map里面
-
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
-
doCall
的解析在后面
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
doCall 函数
-
c.val, c.err = fn()
我们传递的回答函数执行结束,有结果或者有错误 -
c.wg.Done()
通知调用Do接口没有机会得到doCall执行权限的go,让它们取下结果 - 删除缓存,这3行代码是重点,取到结果之后就删除,所以是并发缓存。当然在删除结果之前,等待的一众go程已如狼似虎取完结果,拍拍屁股走人。
g.mu.Lock()
if !c.forgotten {
delete(g.m, key)
}
- 通知DoChan等待的一众go程取结果。
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
if !c.forgotten {
delete(g.m, key)
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
DoChan接口
这个接口没什么好聊的和Do接口差不多,无非是c.wg.Add(1)
这行代码,可以让
Do
和DoChan
混用同一个key
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
Forget接口
删除某个key下面的缓存
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
有疑问加站长微信联系(非本文作者)