Golang框架实战-KisFlow流式计算框架(5)-Function调度

aceld · · 174 次点击 · 开始浏览    置顶

连载中... [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e) ## 4.1 Router 现在,将KisFlow提供对外Function开放注册能力,首先我们要定义一些注册函数原型,和管理这些Function的Router映射关系类型。 创建`kis-flow/kis/router.go`,定义原型如下: > kis-flow/kis/router.go ```go package kis import "context" // FaaS Function as a Service type FaaS func(context.Context, Flow) error // funcRouter // key: Function Name // value: Function 回调自定义业务 type funcRouter map[string]FaaS // flowRouter // key: Flow Name // value: Flow type flowRouter map[string]Flow ``` `FaaS`:是开发者给KisFlow注册的Function回调业务函数原型,需要传递两个参数,Context和Flow,Context主要承载业务的上线文环境,Flow主要承载KisFlow的上下文环境,我们可以通过Flow获取当前Function的配置信息,当前Function的数据信息,已经Flow上其他节点的Function相关信息等。 `funcRouter`: 管理FunctionName和FaaS业务回调的映射关系Map,是一个私有类型,不对外提供引用。需要注意的是,funcRouter的key是FunctionName,因为FunctionId是生成的随机Id,开发者在注册路由的时候,并无法预判和可读,所以关联的业务回调是与FunctionName做的映射关系。 `flowRouter`:管理FlowName和Flow实例的映射关系Map,是一个私有类型,不对外提供引用。`flowRouter`依然是FlowName的映射关系。 ## 4.2 KisPool KisFlow提供一个用来管理全部全局映射关系的类`KisPool`,KisPool包含Router,且提供对Router的管理能力。 ## 4.2.1 KisPool的定义 创建 `kis-flow/kis/pool.go` 文件,来创建`kis_pool`模块。 > kis-flow/kis/pool.go ```go package kis import ( "context" "errors" "fmt" "kis-flow/log" "sync" ) var _poolOnce sync.Once // kisPool 用于管理全部的Function和Flow配置的池子 type kisPool struct { fnRouter funcRouter // 全部的Function管理路由 fnLock sync.RWMutex // fnRouter 锁 flowRouter flowRouter // 全部的flow对象 flowLock sync.RWMutex // flowRouter 锁 } // 单例 var _pool *kisPool // Pool 单例构造 func Pool() *kisPool { _poolOnce.Do(func() { //创建kisPool对象 _pool = new(kisPool) // fnRouter初始化 _pool.fnRouter = make(funcRouter) // flowRouter初始化 _pool.flowRouter = make(flowRouter) }) return _pool } ``` `kis_pool`采用单例模式,`Pool()`方法为获取当前的单例,有关`fnRouter`和 `flowRouter`在生命周期只会初始化一次,通过`sync.Once`来控制。 ## 4.2.2 注册及获取Flow KisPool可以提供添加和获取Flow信息的接口,如下: > kis-flow/kis/pool.go ```go func (pool *kisPool) AddFlow(name string, flow Flow) { pool.flowLock.Lock() defer pool.flowLock.Unlock() if _, ok := pool.flowRouter[name]; !ok { pool.flowRouter[name] = flow } else { errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name) panic(errString) } log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name) } func (pool *kisPool) GetFlow(name string) Flow { pool.flowLock.RLock() defer pool.flowLock.RUnlock() if flow, ok := pool.flowRouter[name]; ok { return flow } else { return nil } } ``` AddFlow会根据相同的FlowName进行做重复校验,相同的Flow无法注册多次。 ## 4.2.3 注册及调度Function KisPool提供注册Funciton回调和调度Funciton方法, 如下。 > kis-flow/kis/pool.go ```go // FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册 func (pool *kisPool) FaaS(fnName string, f FaaS) { pool.fnLock.Lock() defer pool.fnLock.Unlock() if _, ok := pool.fnRouter[fnName]; !ok { pool.fnRouter[fnName] = f } else { errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName) panic(errString) } log.Logger().InfoF("Add KisPool FuncName=%s", fnName) } // CallFunction 调度 Function func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error { if f, ok := pool.fnRouter[fnName]; ok { return f(ctx, flow) } log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName) return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.") } ``` 在`CallFunction()`中,需要传递参数Flow,作为数据流调度的上下文环境。 开发者在自定义FaaS中可以通过Flow来获取一些Funciton信息,所以这里需要给Flow补充几个获取配置信息的接口,之后如果再需要,再继续补充,具体如下: > kis-flow/kis/flow.go ```go type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // ++++++++++++++++++++++++++++++++++ // GetName 得到Flow的名称 GetName() string // GetThisFunction 得到当前正在执行的Function GetThisFunction() Function // GetThisFuncConf 得到当前正在执行的Function的配置 GetThisFuncConf() *config.KisFuncConfig } ``` > kis-flow/flow/kis_flow.go ```go func (flow *KisFlow) GetName() string { return flow.Name } func (flow *KisFlow) GetThisFunction() kis.Function { return flow.ThisFunction } func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig { return flow.ThisFunction.GetConfig() } ``` ## 4.3 KisFunction引用KisPool调度 现在,我们就可以在KisFunctionX中的`Call()`来通过Pool进行调度了,依次修改每个Function的Call()方法。 > kis-flow/function/kis_function_c.go ```go package function import ( "context" "kis-flow/kis" "kis-flow/log" ) type KisFunctionC struct { BaseFunction } func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow) // 通过KisPool 路由到具体的执行计算Function中 if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) return err } return nil } ``` > kis-flow/function/kis_function_e.go ```go package function import ( "context" "kis-flow/kis" "kis-flow/log" ) type KisFunctionE struct { BaseFunction } func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow) // 通过KisPool 路由到具体的执行计算Function中 if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) return err } return nil } ``` > kis-flow/function/kis_function_l.go ```go package function import ( "context" "kis-flow/kis" "kis-flow/log" ) type KisFunctionL struct { BaseFunction } func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow) // 通过KisPool 路由到具体的执行计算Function中 if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) return err } return nil } ``` > kis-flow/function/kis_function_s.go ```go package function import ( "context" "kis-flow/kis" "kis-flow/log" ) type KisFunctionS struct { BaseFunction } func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow) // 通过KisPool 路由到具体的执行计算Function中 if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) return err } return nil } ``` > kis-flow/function/kis_function_v.go ```go package function import ( "context" "kis-flow/kis" "kis-flow/log" ) type KisFunctionV struct { BaseFunction } func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow) // 通过KisPool 路由到具体的执行计算Function中 if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) return err } return nil } ``` ## 4.4 KisPool单元测试 接下来我们来针对KisPool进行单元测试。 ### 4.4.1 自定义FaaS > kis-flow/test/kis_pool_test.go ```go package test import ( "context" "fmt" "kis-flow/common" "kis-flow/config" "kis-flow/flow" "kis-flow/kis" "testing" ) func funcName1Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName1Handler ----") for index, row := range flow.Input() { // 打印数据 str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) // 计算结果数据 resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) // 提交结果数据 _ = flow.CommitRow(resultStr) } return nil } func funcName2Handler(ctx context.Context, flow kis.Flow) error { for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return nil } ``` ### 4.4.2 注册FaaS及启动Flow > kis-flow/test/kis_pool_test.go ```go func TestNewKisPool(t *testing.T) { ctx := context.Background() // 0. 注册Function kis.Pool().FaaS("funcName1", funcName1Handler) kis.Pool().FaaS("funcName2", funcName2Handler) // 1. 创建2个KisFunction配置实例 source1 := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, } source2 := config.KisSource{ Name: "用户订单错误率", Must: []string{"order_id", "user_id"}, } myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil) if myFuncConfig1 == nil { panic("myFuncConfig1 is nil") } myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil) if myFuncConfig2 == nil { panic("myFuncConfig2 is nil") } // 2. 创建一个 KisFlow 配置实例 myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable) // 3. 创建一个KisFlow对象 flow1 := flow.NewKisFlow(myFlowConfig1) // 4. 拼接Functioin 到 Flow 上 if err := flow1.Link(myFuncConfig1, nil); err != nil { panic(err) } if err := flow1.Link(myFuncConfig2, nil); err != nil { panic(err) } // 5. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 6. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` cd到`kis-flow/test/`下执行命令: ```bash go test -test.v -test.paniconexit0 -test.run TestNewKisPool ``` 结果如下: ```bash === RUN TestNewKisPool Add KisPool FuncName=funcName1 Add KisPool FuncName=funcName2 context.Background ====> After CommitSrcData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionC, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c0190 ThisFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]} ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionE, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c01e0 ThisFunctionId:func-9cd2ab870b384794b312d2be10bb06fa PrevFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]} In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 0 In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 1 In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 2 --- PASS: TestNewKisPool (0.00s) PASS ok kis-flow/test 0.520s ``` 经过日志的详细校验,结果是符合我们预期的。 好了,现在Function的业务能力已经开放给开发者了,接下来我们来继续完善KisFlow的能力。 ## 4.5 【V0.3】源代码 https://github.com/aceld/kis-flow/releases/tag/v0.3 --- 作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld) KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) --- [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

174 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传