Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

aceld · · 593 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

连载中... [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) --- 首先我们要先定义KisFlow的核心结构体,`KisFlow`结构体,通过上述的设计理念,我们得知,KisFlow表示整个一条数据计算流的结构。其中每次数据在一个Flow上,依次执行挂载在当前Flow的Function。 ## 2.3.1 KisFunction家族 KisFunction应该是一个链式调用,所以结构体的基本形态应该是一个链表,通过一次Function的调用结束后,默认可以调度到下一个Function的节点上。 在KisFlow中,一共有 `save`,`load`, `calculate`, `extend`, `varify`等多种行为的Funciton,所以这里我们采用上述五种function的模板类,方便今后在不同针对不同特征的function做更加灵活和功能隔离的拓展和改造。 整体的KisFunction的类图设计如下: ![image.png](https://static.golangjob.cn/240226/9dcbdc5ef2e2894607ee81d399534e09.png) ## 2.2.2 抽象层KisFunction定义 在`kis-flow`中创建一个新的目录`function`用来存放function的代码。 首先抽象接口编写在`kis/`目录下。 > kis-flow/kis/function.go ```go package kis import ( "context" "kis-flow/config" ) // Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元, // 任意个KisFunction可以组合成一个KisFlow type Function interface { // Call 执行流式计算逻辑 Call(ctx context.Context, flow Flow) error // SetConfig 给当前Function实例配置策略 SetConfig(s *config.KisFuncConfig) error // GetConfig 获取当前Function实例配置策略 GetConfig() *config.KisFuncConfig // SetFlow 给当前Function实例设置所依赖的Flow实例 SetFlow(f Flow) error // GetFlow 获取当前Functioin实力所依赖的Flow GetFlow() Flow // CreateId 给当前Funciton实力生成一个随机的实例KisID CreateId() // GetId 获取当前Function的FID GetId() string // GetPrevId 获取当前Function上一个Function节点FID GetPrevId() string // GetNextId 获取当前Function下一个Function节点FID GetNextId() string // Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil Next() Function // Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil Prev() Function // SetN 设置下一层Function实例 SetN(f Function) // SetP 设置上一层Function实例 SetP(f Function) } ``` ## 2.2.3 KisId随机唯一实例ID 上述提出了一个新的概念`KisId`。 KisID为`Function`的实例ID,用于KisFlow内部区分不同的实例对象。KisId 和 Function Config中的 Fid的区别在于,Fid用来形容一类Funcion策略的ID,而KisId则为在KisFlow中KisFunction已经实例化的 实例对象ID 这个ID是随机生成且唯一。 创建`kis-flow/id/`目录,且创建kis_id.go 文件,实现有关kis_id生成的算法。 > kis-flow/id/kis_id.go ```go package id import ( "github.com/google/uuid" "kis-flow/common" "strings" ) // KisID 获取一个中随机实例ID // 格式为 "prefix1-[prefix2-][prefix3-]ID" // 如:flow-1234567890 // 如:func-1234567890 // 如: conn-1234567890 // 如: func-1-1234567890 func KisID(prefix ...string) (kisId string) { idStr := strings.Replace(uuid.New().String(), "-", "", -1) kisId = formatKisID(idStr, prefix...) return } func formatKisID(idStr string, prefix ...string) string { var kisId string for _, fix := range prefix { kisId += fix kisId += common.KisIdJoinChar } kisId += idStr return kisId } ``` `kisId`模块提供`KisID()`方法,这里面依赖了第三方分布式ID生成库`github.com/google/uuid`,生成的随机ID为一个字符串,且调用者可以提供多个前缀,通过`-`符号进行拼接,得到的随机字符串ID,如:`func-1234567890` 针对KisId的前缀,提供了一些字符串的枚举,如下: > kis-flow/common/const.go ```go // KisIdType 用户生成KisId的字符串前缀 const ( KisIdTypeFlow = "flow" KisIdTypeConnnector = "conn" KisIdTypeFunction = "func" KisIdTypeGlobal = "global" KisIdJoinChar = "-" ) ``` ## 2.2.4 BaseFunction基础父类 按照设计,我们需要提供一个`BaseFunction`作为`Function`的一个子类,实现一些基础的功能接口。留下`Call()`让具体模式的`KisFunctionX`来重写实现,下面来进行对BaseFunction结构的定义。 在 `kis-flow/function/`创建`kis_base_funciton.go` 文件。 ### A. 结构定义 > kis-flow/function/kis_base_function.go ```go package function import ( "context" "errors" "kis-flow/common" "kis-flow/config" "kis-flow/id" "kis-flow/kis" ) type BaseFunction struct { // Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象 Id string Config *config.KisFuncConfig // flow Flow kis.Flow //上下文环境KisFlow // link N kis.Function //下一个流计算Function P kis.Function //上一个流计算Function } ``` ### B. 方法实现 > kis-flow/function/kis_base_function.go ```go // Call // BaseFunction 为空实现,目的为了让其他具体类型的KisFunction,如KisFunction_V 来继承BaseFuncion来重写此方法 func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil } func (base *BaseFunction) Next() kis.Function { return base.N } func (base *BaseFunction) Prev() kis.Function { return base.P } func (base *BaseFunction) SetN(f kis.Function) { base.N = f } func (base *BaseFunction) SetP(f kis.Function) { base.P = f } func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error { if s == nil { return errors.New("KisFuncConfig is nil") } base.Config = s return nil } func (base *BaseFunction) GetId() string { return base.Id } func (base *BaseFunction) GetPrevId() string { if base.P == nil { //Function为首结点 return common.FunctionIdFirstVirtual } return base.P.GetId() } func (base *BaseFunction) GetNextId() string { if base.N == nil { //Function为尾结点 return common.FunctionIdLastVirtual } return base.N.GetId() } func (base *BaseFunction) GetConfig() *config.KisFuncConfig { return base.Config } func (base *BaseFunction) SetFlow(f kis.Flow) error { if f == nil { return errors.New("KisFlow is nil") } base.Flow = f return nil } func (base *BaseFunction) GetFlow() kis.Flow { return base.Flow } func (base *BaseFunction) CreateId() { base.Id = id.KisID(common.KisIdTypeFunction) } ``` 这里注意 `GetPrevId()`和 `GetNextId()`两个方法实现,因为如果当前`Functioin`为双向链表中的第一个节点或者最后一个节点,那么他们的上一个或者下一个是没有节点的,那么ID也就不存在,为了在使用中不出现得不到ID的情况,我们提供了两个虚拟FID,做特殊情况的边界处理,定义在const.go中。 > kis-flow/common/const.go ```go const ( // FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID FunctionIdFirstVirtual = "FunctionIdFirstVirtual" // FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID FunctionIdLastVirtual = "FunctionIdLastVirtual" ) ``` ## 2.2.5 KisFunction的V/S/L/C/E等模式类定义 下面分别实现V/S/L/C/E 五种不同模式的KisFunction子类。这里分别用创建文件来实现。 ### A. KisFunctionV > kis-flow/function/kis_function_v.go ```go package function import ( "context" "fmt" "kis-flow/kis" ) type KisFunctionV struct { BaseFunction } func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionV, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 return nil } ``` ### B. KisFunctionS > kis-flow/function/kis_function_s.go ```go package function import ( "context" "fmt" "kis-flow/kis" ) type KisFunctionS struct { BaseFunction } func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionS, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 return nil } ``` ### C. KisFunctionL > kis-flow/function/kis_function_l.go ```go package function import ( "context" "fmt" "kis-flow/kis" ) type KisFunctionL struct { BaseFunction } func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionL, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 return nil } ``` ### D. KisFunctionC > kis-flow/function/kis_function_c.go ```go package function import ( "context" "fmt" "kis-flow/kis" ) type KisFunctionC struct { BaseFunction } func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunction_C, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 return nil } ``` ### E. KisFunctionE > kis-flow/function/kis_function_e.go ```go package function import ( "context" "fmt" "kis-flow/kis" ) type KisFunctionE struct { BaseFunction } func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionE, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 return nil } ``` ## 2.2.6 创建KisFunction实例 下面提供一个创建具体模式`Function`的方法,这里采用简单工厂方法模式来实现创建对象。 > kis-flow/function/kis_base_function.go ```go func (base *BaseFunction) CreateId() { base.Id = id.KisID(common.KisIdTypeFunction) } // NewKisFunction 创建一个NsFunction // flow: 当前所属的flow实例 // s : 当前function的配置策略 func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function { var f kis.Function //工厂生产泛化对象 switch common.KisMode(config.FMode) { case common.V: f = new(KisFunctionV) break case common.S: f = new(KisFunctionS) case common.L: f = new(KisFunctionL) case common.C: f = new(KisFunctionC) case common.E: f = new(KisFunctionE) default: //LOG ERROR return nil } // 生成随机实例唯一ID f.CreateId() //设置基础信息属性 if err := f.SetConfig(config); err != nil { panic(err) } if err := f.SetFlow(flow); err != nil { panic(err) } return f } ``` 注意 `NewKisFunction()`方法返回的是一个抽象的interface `Function`。 还要注意,目前到这里没有实现`Flow`对象,但是KisFunciton的创建需要依赖传递一个`Flow`对象,我们现在可以暂时简单创建一个`Flow`对象的构造方法,之后在实现`Flow`章节再完善这部分的代码。 在 `kis-filw/kis/`中创建`flow.go`文件。 > kis-flow/kis/flow.go ```go package kis import ( "context" "kis-flow/config" ) type Flow interface { // TODO } ``` 在`kis-flow/flow/`下创建`kis_flow.go`文件,实现如下: > kis-flow/flow/kis_flow.go ```go package flow import "kis-flow/config" // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { Id string Name string // TODO } // TODO for test // NewKisFlow 创建一个KisFlow. func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow := new(KisFlow) // 基础信息 flow.Id = id.KisID(common.KisIdTypeFlow) flow.Name = conf.FlowName return flow } ``` ### 2.2.7 单元测试KisFunction创建实例 现在来对上述的KisFunction实例的创建做一个简单的单元测试,在`kis-flow/test/`创建`kis_function_test.go`文件。 > kis-flow/test/kis_function_test.go ```go package test import ( "context" "kis-flow/common" "kis-flow/config" "kis-flow/flow" "kis-flow/function" "testing" ) func TestNewKisFunction(t *testing.T) { ctx := context.Background() // 1. 创建一个KisFunction配置实例 source := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, } myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil) if myFuncConfig1 == nil { panic("myFuncConfig1 is nil") } // 2. 创建一个 KisFlow 配置实例 myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable) // 3. 创建一个KisFlow对象 flow1 := flow.NewKisFlow(myFlowConfig1) // 4. 创建一个KisFunction对象 func1 := function.NewKisFunction(flow1, myFuncConfig1) if err := func1.Call(ctx, flow1); err != nil { t.Errorf("func1.Call() error = %v", err) } } ``` 流程很简单,分为四个小步骤: 1. 创建一个KisFunction配置实例 2. 创建一个 KisFlow 配置实例 3. 创建一个KisFlow对象 4. 创建一个KisFunction对象 cd到`kis-flow/test/`目录下执行: ```bash go test -test.v -test.paniconexit0 -test.run TestNewKisFunction ``` 结果如下: ```bash === RUN TestNewKisFunction KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}} --- PASS: TestNewKisFunction (0.00s) PASS ok kis-flow/test 1.005s ``` 我们已经调用到了具体的`KisFunciton_C`实例的`Call()`方法。 --- 作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld) KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) --- 连载中... [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

593 次点击  
加入收藏 微博
1 回复  |  直到 2024-11-22 10:42:37
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传