图执行引擎那些事(二)

symphony09 · · 1543 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

上一篇中,确定了如何通过 DAG 来编排业务算子,解决了第一个主要的问题。 而怎么封装业务算子则是实现图执行引擎的第二个主要的问题。 这块的代码实现主要在 [symphony09/running](https://github.com/symphony09/running) 的 core.go 和 core_impl.go。 第一步可以先定义最简单的 Node 接口,然后逐步完善,例如: ```go // Node basic unit of execution type Node interface { Name() string // Run will be called when all deps solved or cluster invoke it Run() } ``` ## 输入与输出 如果把算子简单的看作一个函数,那么首先要解决的就是函数的输入输出问题,从引擎来看就是数据流的问题。 在 running 中,我将输入分为了三类:props、context、state ### Props 构造算子时输入的初始化参数。相关代码如下: ```go // Props provide build parameters for the node builder type Props interface { // Get return global value of the key Get(key string) (interface{}, bool) //SubGet node value of the key, deliver node name as sub SubGet(sub, key string) (interface{}, bool) } type BuildNodeFunc func(name string, props Props) (Node, error) ``` `BuildNodeFunc`定义了算子的构造函数,有了构造函数和初始化参数,引擎就可以在需要时创建算子。 ### Context 运行算子时输入的上下文参数。 这块我直接使用了 Go 标准库的 context 接口定义 ,所以除了传递 Request Scope 参数外,也可以用它实现超时控制。 相应的,Node 接口变为: ```go // Node basic unit of execution type Node interface { Name() string // Run will be called when all deps solved or cluster invoke it Run(ctx context.Context) } ``` ### State 和 Context 一样,state 属于 Request Scope 参数。 不同的是,ctx 由引擎外部输入,参数应当相对简单,并在传递过程中不做改变, 而 state 由算子产出,用于算子间数据传递,最后也作为执行结果输出到引擎外。 因此,state 可能被多个算子同时访问,需要考虑数据竞争问题。 相关代码如下: ```go // State store state of nodes type State interface { // Query return value of the key Query(key string) (interface{}, bool) // Update set a new value for the key Update(key string, value interface{}) // Transform set a new value for the key, according to the old value Transform(key string, transform TransformStateFunc) } type TransformStateFunc func(from interface{}) interface{} // Stateful a class of nodes that need record or query state type Stateful interface { Node // Bind deliver the state, should be called before engine run the node Bind(state State) } ``` 如果算子实现了 Bind 方法,引擎就会在运行算子前,先调用 Bind 方法绑定 state,这样算子就可以在运行时访问 state 了。 此外要说明一下的是 Transform 方法, 它和 Update 方法都用于更新参数值,不同的是 Transform 可以根据原值来更新。 考虑这么一个场景: 某个算子需要在原数组追加若干元素,分为以下三步: 1. 查询原值 2. 追加元素 3. 更新原值 如果在第2步和第三步中间,其他算子也更新了这个值,那么第三步就会覆盖其他算子的更新。 而 Transform 可以原子化地执行查询和更新操作,避免这种情况发生。 具体实现可以参考 [running/core_impl.go at main · symphony09/running (github.com)](https://github.com/symphony09/running/blob/main/core_impl.go) 中 StandardState 的实现。 ### 流程示意图 整个输入输出流程可以用下图表示: ![image-20220717134423702.png](https://static.golangjob.cn/220717/21381a0b35b43425c059fc7f45df0d83.png) ## 性能优化 上文已经基本确定了算子初始化和运行的流程,但是还有一个问题需要解决: 算子构造可能需要耗费大量时间,如果每次运行都重新构造算子,可能会导致整体性能低下。 在 running 中我使用了两个策略来优化这个问题。 ### Worker 池 池是避免重复初始化的一个实用的策略,引擎将算子封装到 Worker 中,然后通过池来管理 Worker。 在 Worker 执行完成后,可以将其放回池子,需要时再取出执行。 这里还要解决一个问题,在 Worker 执行前,需要保证算子是初始化状态,因此,算子还需要增加重置方法。 Node 接口相应改为: ```go // Node basic unit of execution type Node interface { Name() string // Run will be called when all deps solved or cluster invoke it Run(ctx context.Context) // Reset will be called when the node will no longer execute until the next execution plan Reset() } ``` 目前 running 中使用了 Go 标准库的 `sync.Pool`实现此功能,具体实现可以参考 engine.go,pool.go,pool_worker.go。 ### 预构建 虽然 Worker 池可以在整体上减少构造算子耗费的的时间,但是避免不了突发高频执行时可能产生的毛刺问题。 那么使用预先构造好的算子可以彻底解决这个问题。 这里的问题是如果直接复制预先构造好的算子,因为 Go 是浅复制,可能使得本不相关的执行过程相互影响。 解决方法是为算子显式地实现 Clone 方法: ```go //Cloneable a class of nodes that can be cloned type Cloneable interface { Node // Clone self Clone() Node } ``` 这样引擎就会调用 clone 方法,安全地复制预构建算子。 使用这两个策略就能最大可能地降低重复构建算子产生的性能损耗。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1543 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传