golang-针对缓存击穿动态补充缓存策略

yzbzg · · 1003 次点击 · · 开始浏览    

  1. 什么是缓存击穿

某时刻缓存失效,大批量请求涌来,导致db io过大

      2. go如何解决大量请求涌来,动态即缓存并返回且使db即cache中间件最低限度的io操作

 通过channel控制所有请求只有一个请求去持久缓存,其余等待并订阅该事件(也就是获取缓存的事件)。当缓存写入完成后,通过每个请求上线文多路通知所有请求并返回。

     



//具体业务获取策略获取定制器

type BaseWorker struct {
}

func (b *BaseWorker) Context(p context.Context) (cCtx context.Context, cancel context.CancelFunc) {
	cCtx, cancel = context.WithCancel(p)
	return
}

func (b *BaseWorker) Do() (bool, interface{}) {
	panic("implement run")
}

func (b *BaseWorker) Fallback() interface{} {

	return nil

}

// TryGet 尝试获取对象
func (b *BaseWorker) TryGet() (bool, interface{}) {
	panic("implement try get")
}

type IWorker interface {
	Do() (bool, interface{})
	Context(p context.Context) (cCtx context.Context, cancel context.CancelFunc)
	TryGet() (bool, interface{})
	Fallback() interface{}
}
//核心多请求控制处理器


type parallel struct {
	//请求信号(只有一个请求可获取)
	ch chan struct{}

	//父级上下文
	pCxt    context.Context
	pCancel context.CancelFunc
	tag     string
	//请求池容量
	size int
	//请求池标识
	reqPool chan struct{} //连接池

	//信号返回值
	chRes interface{}
}

// NewParallel  实例化并行操作
func NewParallel(tag string, size int) *parallel {
	p := &parallel{
		ch:   make(chan struct{}, 1),
		pCxt: context.Background(),
		tag:  tag,
		size: size,
	}

	//初始化请求实例权
	if size > 0 {
		p.reqPool = make(chan struct{}, size)
		for i := 0; i < size; i++ {
			p.reqPool <- struct{}{}
		}
	}

	p.pCxt, p.pCancel = context.WithCancel(context.Background())
	p.ch <- struct{}{}

	return p
}

// release 释放
func (p *parallel) release() {

	select {
	case p.ch <- struct{}{}:
		p.pCxt, p.pCancel = context.WithCancel(context.Background())

	default:
		//并行重复写入
	}
}

// tryGetReq 获取请求连接
func (p *parallel) tryGetReq() bool {

	if p.size <= 0 {
		return true
	}

	select {
	case <-p.reqPool:
		return true
	default:
		return false
	}

}

// releaseReq 回收请求权
func (p *parallel) releaseReq() {

	if p.size <= 0 {
		return
	}

	//回收请求权
	select {
	case p.reqPool <- struct{}{}:
	default:

	}

}

// tryDoRes 获取结果值
type tryDoRes struct {
	Tip string
	Res interface{}
}

// TryDo 任务并行
func (p *parallel) TryDo(worker IWorker) (ret tryDoRes) {

	//获取连接
	ok := p.tryGetReq()
	if !ok {
		//超过连接池.需要做降级返回
		ret.Res = worker.Fallback()
		ret.Tip = "【熔断降级结果】"
		return
	}

	ok, obj := worker.TryGet()
	if ok {
		ret.Res = obj
		ret.Tip = "【正常读取缓存】"

		return
	}

	//获取上下文
	cCtx, cancel := worker.Context(p.pCxt)
	select {
	case <-p.ch:

		_, ret.Res = worker.Do()
		ret.Tip = "【执行操作】"
		p.chRes = ret.Res
		//消息通知
		p.pCancel()
		p.release()

	case <-cCtx.Done():
		//返回数据
		cancel()
		//拉取信号相应值
		ret.Res = p.chRes
		//_, ret.Res = worker.TryGet()
		ret.Tip = "【事件通知】"

	}

	//释放请求
	p.releaseReq()
	return
}
//对象工厂

type factory struct {
	pMap   map[string]*parallel
	rwLock *sync.RWMutex
}

func NewFactory() *factory {
	return &factory{
		pMap:   make(map[string]*parallel),
		rwLock: &sync.RWMutex{},
	}
}

// Register 注册并行实例
func (f *factory) Register(tag string, size int) {

	defer f.rwLock.Unlock()
	f.rwLock.Lock()
	f.pMap[tag] = NewParallel(tag, size)
	return

}

// Get 获取实例
func (f *factory) Get(tag string) *parallel {
	defer f.rwLock.RUnlock()
	f.rwLock.RLock()
	v, _ := f.pMap[tag]
	return v
}
//具体模拟请求并使用

var (
	//模拟一个缓存
	cacheMap map[string]interface{} = make(map[string]interface{})
	//构造并行对象池
	fac = parallel.NewFactory()
)

//构造一个拉取对应业务缓存的工作单元
type UserCacheWorker struct {
	parallel.BaseWorker    //继承基础单元
}


func NewUserCacheWorker() *UserCacheWorker {
	return &UserCacheWorker{}
}

//Do 具体工作业务(如何写入缓存)
func (b *UserCacheWorker) Do() (bool, interface{}) {

	log.Println("写入缓存")
	//模拟高Q延迟
	time.Sleep(time.Second * 5)
	cacheMap["user"] = map[string]interface{}{
		"name": "lsh",
	}

	return true, cacheMap

}

func (b *UserCacheWorker) TryGet() (bool, interface{}) {

	//读缓存策略
	if len(cacheMap) > 0 {
		return true, cacheMap
	}

	//返回结果
	return false, nil
}

// Fallback 重写熔断管道
func (b *UserCacheWorker) Fallback() interface{} {

	return map[string]interface{}{
		"err": "from fallback",
	}

}
func main() {

	//注册并行操作器
	fac.Register("user_cache", 0)

	fmt.Println("====================================loop 1===========================================")

	for i := 0; i < 100000; i++ {

		go func(index int) {

			obj := fac.Get("user_cache").TryDo(NewUserCacheWorker())
			log.Println("[编号]:", index, "[结果]:", obj)

		}(i)

	}

	time.Sleep(time.Second * 10)

	//清空缓存
	cacheMap = make(map[string]interface{})
	time.Sleep(time.Second * 1)

	fmt.Println("====================================loop 2===========================================")

	//再次并行请求
	for i := 0; i < 100000; i++ {

		go func(index int) {

			obj := fac.Get("user_cache").TryDo(NewUserCacheWorker())
			log.Println("[编号]:", index, "[结果]:", obj)

		}(i)

	}
	fmt.Scanln()

}

 


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1003 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传