- 什么是缓存击穿
某时刻缓存失效,大批量请求涌来,导致db io过大
2. go如何解决大量请求涌来,动态即缓存并返回且使db即cache中间件最低限度的io操作
通过channel控制所有请求只有一个请求去持久缓存,其余等待并订阅该事件(也就是获取缓存的事件)。当缓存写入完成后,通过每个请求上线文多路通知所有请求并返回。
//具体业务获取策略获取定制器
type BaseWorker struct {
}
func (b *BaseWorker) Context(p context.Context) (cCtx context.Context, cancel context.CancelFunc) {
cCtx, cancel = context.WithCancel(p)
return
}
func (b *BaseWorker) Do() (bool, interface{}) {
panic("implement run")
}
func (b *BaseWorker) Fallback() interface{} {
return nil
}
// TryGet 尝试获取对象
func (b *BaseWorker) TryGet() (bool, interface{}) {
panic("implement try get")
}
type IWorker interface {
Do() (bool, interface{})
Context(p context.Context) (cCtx context.Context, cancel context.CancelFunc)
TryGet() (bool, interface{})
Fallback() interface{}
}
//核心多请求控制处理器
type parallel struct {
//请求信号(只有一个请求可获取)
ch chan struct{}
//父级上下文
pCxt context.Context
pCancel context.CancelFunc
tag string
//请求池容量
size int
//请求池标识
reqPool chan struct{} //连接池
//信号返回值
chRes interface{}
}
// NewParallel 实例化并行操作
func NewParallel(tag string, size int) *parallel {
p := ¶llel{
ch: make(chan struct{}, 1),
pCxt: context.Background(),
tag: tag,
size: size,
}
//初始化请求实例权
if size > 0 {
p.reqPool = make(chan struct{}, size)
for i := 0; i < size; i++ {
p.reqPool <- struct{}{}
}
}
p.pCxt, p.pCancel = context.WithCancel(context.Background())
p.ch <- struct{}{}
return p
}
// release 释放
func (p *parallel) release() {
select {
case p.ch <- struct{}{}:
p.pCxt, p.pCancel = context.WithCancel(context.Background())
default:
//并行重复写入
}
}
// tryGetReq 获取请求连接
func (p *parallel) tryGetReq() bool {
if p.size <= 0 {
return true
}
select {
case <-p.reqPool:
return true
default:
return false
}
}
// releaseReq 回收请求权
func (p *parallel) releaseReq() {
if p.size <= 0 {
return
}
//回收请求权
select {
case p.reqPool <- struct{}{}:
default:
}
}
// tryDoRes 获取结果值
type tryDoRes struct {
Tip string
Res interface{}
}
// TryDo 任务并行
func (p *parallel) TryDo(worker IWorker) (ret tryDoRes) {
//获取连接
ok := p.tryGetReq()
if !ok {
//超过连接池.需要做降级返回
ret.Res = worker.Fallback()
ret.Tip = "【熔断降级结果】"
return
}
ok, obj := worker.TryGet()
if ok {
ret.Res = obj
ret.Tip = "【正常读取缓存】"
return
}
//获取上下文
cCtx, cancel := worker.Context(p.pCxt)
select {
case <-p.ch:
_, ret.Res = worker.Do()
ret.Tip = "【执行操作】"
p.chRes = ret.Res
//消息通知
p.pCancel()
p.release()
case <-cCtx.Done():
//返回数据
cancel()
//拉取信号相应值
ret.Res = p.chRes
//_, ret.Res = worker.TryGet()
ret.Tip = "【事件通知】"
}
//释放请求
p.releaseReq()
return
}
//对象工厂
type factory struct {
pMap map[string]*parallel
rwLock *sync.RWMutex
}
func NewFactory() *factory {
return &factory{
pMap: make(map[string]*parallel),
rwLock: &sync.RWMutex{},
}
}
// Register 注册并行实例
func (f *factory) Register(tag string, size int) {
defer f.rwLock.Unlock()
f.rwLock.Lock()
f.pMap[tag] = NewParallel(tag, size)
return
}
// Get 获取实例
func (f *factory) Get(tag string) *parallel {
defer f.rwLock.RUnlock()
f.rwLock.RLock()
v, _ := f.pMap[tag]
return v
}
//具体模拟请求并使用
var (
//模拟一个缓存
cacheMap map[string]interface{} = make(map[string]interface{})
//构造并行对象池
fac = parallel.NewFactory()
)
//构造一个拉取对应业务缓存的工作单元
type UserCacheWorker struct {
parallel.BaseWorker //继承基础单元
}
func NewUserCacheWorker() *UserCacheWorker {
return &UserCacheWorker{}
}
//Do 具体工作业务(如何写入缓存)
func (b *UserCacheWorker) Do() (bool, interface{}) {
log.Println("写入缓存")
//模拟高Q延迟
time.Sleep(time.Second * 5)
cacheMap["user"] = map[string]interface{}{
"name": "lsh",
}
return true, cacheMap
}
func (b *UserCacheWorker) TryGet() (bool, interface{}) {
//读缓存策略
if len(cacheMap) > 0 {
return true, cacheMap
}
//返回结果
return false, nil
}
// Fallback 重写熔断管道
func (b *UserCacheWorker) Fallback() interface{} {
return map[string]interface{}{
"err": "from fallback",
}
}
func main() {
//注册并行操作器
fac.Register("user_cache", 0)
fmt.Println("====================================loop 1===========================================")
for i := 0; i < 100000; i++ {
go func(index int) {
obj := fac.Get("user_cache").TryDo(NewUserCacheWorker())
log.Println("[编号]:", index, "[结果]:", obj)
}(i)
}
time.Sleep(time.Second * 10)
//清空缓存
cacheMap = make(map[string]interface{})
time.Sleep(time.Second * 1)
fmt.Println("====================================loop 2===========================================")
//再次并行请求
for i := 0; i < 100000; i++ {
go func(index int) {
obj := fac.Get("user_cache").TryDo(NewUserCacheWorker())
log.Println("[编号]:", index, "[结果]:", obj)
}(i)
}
fmt.Scanln()
}
有疑问加站长微信联系(非本文作者)