# Micro In Action(尾声): 分布式计划任务
![Micro In Action](https://s1.ax1x.com/2020/03/28/GkhGy4.png)
> 本文作者:Che Dan
>
> 原文链接:https://medium.com/@dche423/micro-in-action-9-cron-job-dabec09058e1
本文是《Micro In Acton》 系列的最后一篇。
我们在前面的文章中已经探讨过了利于Micro创建和使用微服务的方方面面。 今天到了最后一个话题: 计划任务。
## 要作好发布式计划任务不容易
基本上每个系统都需要计划任务。 它们用来执行一些后台任务, 有的是在特点时间点运行一次, 有的是定期重复执行。
如果系统里只有一个节点, 那么事情非常简单直接。我们把Web服务和后台程序都放到这个节点上, 只要这个节点是正常的, 那么后台程序就可以顺利执行。
[8-1.png](https://static.studygolang.com/200328/83823aa15b8811434f982d508ed3c1e6.png)
但是当今的系统很少是单节点就可以搞定的, 我们需要考虑系统的鲁棒性和可扩展性。当系统的节点数是一个以后, 我们就不得不面对两个互相矛盾的问题: **重复执行**和**单点故障**。
![8-2.png](https://static.studygolang.com/200328/a7a0350f65eb0e291481e618ec8f27bb.png)
如果我们不改代码,只是往系统中填加更多完全相同的副本节点,如上图所示,必然会导致重复执行(因为每个节点中包含相同的后台程序)。要避免这些重复, 我们需要写代码去小心处理顺序执行逻辑,这很容易出错。
有些“聪明人”会调整架构。 他们把计划任务要执行的逻辑并入到Web服务中,用HTTP API 或gRPC API 把这些功能暴露出来。然后用操作系统的计划任务(例如crontab)来调用这些API,完成同样的功能。
![8-3.png](https://static.studygolang.com/200328/f7290ae64c13a237c1ba998e0e558ab0.png)
这个架构的好处是结构简单,并且可以避免重复执行。 操作系统触发计划任务把请求发给负载均衡, 然后负载均衡会把请求转到集群中的某一个节点。
但它还是无法避免单点故障。 如果运行计划任务的那台服务器出了问题, 任务还是无法执行。
还有一些其它的替代方法, 大多无法同时解决前面提到的两个问题。
比如为后台程序单独部署一个节点, 仍存在单点故障问题。
![8-4.png](https://static.studygolang.com/200328/e775ede9677044f03307c93f079480bf.png)
比如为后台程序部署多个节点,仍会发生重复执行。
![8-5.png](https://static.studygolang.com/200328/be419e52f672716e64e412c66b90358a.png)
所以应该怎么解决这个困难吗? 说到底,我们是遇到了分布式计算中一个经典问题: [**领导权选举问题**](https://en.wikipedia.org/wiki/Leader_election)**。**
我们希望保持所有的服务节点都是完全相同(至少大部分时间如此)并无状态的。 这样就可以保证整体系统可以方便地横向扩展。但有些时间 , 我们必须在这些对等节点中选出一个主节点, 用于执行少量关键任务(例如计划任务)。
分布式选举的方案并不需要自己从头实现, 很多服务发现系统都内置提供这种能力, 例如etcd, zookeeper 等等都有此功能。
有了分布式选举,就可以保证我们的计划任务只在单一的主节点上运行(因此避免了重复执行)。而如果当前主节点因为某种原因宕机了, 其它节点会自动共同选举出新的主节点,执行后续的任务。(因为没有单点故障)
![8-6.png](https://static.studygolang.com/200328/013237c7c149433a090e59d905735c46.png)
这其实就是Micro中处理计划任务所采用的方法。
------
## Micro 中的计划任务
Micro 通过`github.com/micro/go-micro/sync` 这个包提供了计划任务的功能。
不过, 与其它很多Micro功能一样,这是个“**隐藏**”功能。 没文档,没示例也没有测试用例可以参考。自己看代码学习,使用后果自负????.
关于这个功能, **本文可能是目前全网唯一一份资料**。
### 关键组件
此功能由几个接口与类组成。
其中最重要的是 `sync.Cron`接口:
```go
// Cron is a distributed scheduler using leader election
// and distributed task runners. It uses the leader and
// task interfaces.
type Cron interface {
Schedule(task.Schedule, task.Command) error
}
```
这个接口只包含一个方法`Schedule`,它接受俩参数。
第一个参数 `task.Schedule` 代表任务的时间计划。
```go
// Schedule represents a time or interval at which a task should run
type Schedule struct {
// When to start the schedule. Zero time means immediately
Time time.Time
// Non zero interval dictates an ongoing schedule
Interval time.Duration
}
```
这个类有两个属性, 一是任务执行的起始时间点, 另一个重是执行间隔间事。
方法的第二个参数 `task.Command` 定义了任务要执行的命令, 包含了名称和一个函数指针(指定具体的执行代码)。
```go
// Command to be executed
type Command struct {
Name string
Func func() error
}
```
### 设置计划任务
了解了上面的组件,可以很容易地创建并设置一个计划任务。
```go
package mainimport (
"time" "github.com/micro/go-micro"
"github.com/micro/go-micro/sync"
"github.com/micro/go-micro/sync/task"
"github.com/micro/go-micro/util/log"
)
func main() {
// New Service
service := micro.NewService(
micro.Name("com.foo.cron.example"), // name the client service
)
// Initialise service
service.Init() cron := sync.NewCron()
cron.Schedule(
task.Schedule{Interval: 10 * time.Second},
task.Command{Name: "foo", Func: func() error {
log.Debug("finish command foo")
return nil
}},
)
if err := service.Run(); err != nil {
log.Fatal(err)
}
}
```
像往常一样, 先创建并初始化一个`service`,然后用`sync.NewCron()`创建一个`Cron`接口。然后用`cron.Schedule`创建设置一个简单地任务,它会每隔10秒在控制台输出一行日志。
下面把程序跑起来:
```bash
go run main.go2020-03-06 16:47:43.696426 I | Transport [http] Listening on [::]:62880
2020-03-06 16:47:43.696518 I | Broker [http] Connected to [::]:62882
2020-03-06 16:47:43.696808 I | Registry [mdns] Registering node: com.foo.cron.example-eb3a033f-4a33-438e-a467-6d2978027f2b
```
你可能预期每隔10秒会看到一些日志输出,**但结果是啥也没有**。不报错,也没有日志。 为啥呢?
我们知道Micro有个约定: 所有关键组件都有内置的轻量组实现。 比如有基于mDNS的注册组件, 有基于HTTP的Broker实现。 有了他们,我们可以不安装任何外部依赖,在本地完成开发。
但计划任务这个功能打破了上述约定,想使用它, 必须依赖**etcd**。看看`sync.NewCron`的源码便知道是怎么回事儿了:
```go
func NewCron(opts ...Option) Cron {
...
if options.Leader == nil {
options.Leader = etcd.NewLeader()
}
...
}
```
如果调用时没有提供`Leader`选项(它的类型为`leader.Leader`),那么会创建一个默认的(`etcd.NewLeader`)。就是这一行使得我们必须依赖etcd。如果再进一步看看`etcd.NewLeader`的代码, 就能找到根源;
```go
func NewLeader(opts ...leader.Option) leader.Leader {
...
if len(endpoints) == 0 {
endpoints = []string{"http://127.0.0.1:2379"}
}
...
}
```
因为我们没有etcd在 127.0.0.1:2379 地址上服务**,** 这个任务将永远卡在那儿,关键还不报错**。**
那么`leader.Leader`是什么? 我们为什么需要它呢?先看看他的定义:
```go
// Leader provides leadership election
type Leader interface {
// elect leader
Elect(id string, opts ...ElectOption) (Elected, error)
// follow the leader
Follow() chan string
}
```
本文上一节提到了, 要在分布式环境中成功执行计划任务, 我们需需有领导权选举的能力,而`leader.Leader`就是用来作这个的。在sync.Cron 的内部实现中,会调用此接口的`Elect`方法来获取领导。
```go
func (c *syncCron) Schedule(s task.Schedule, t task.Command) error {
id := fmt.Sprintf("%s-%s", s.String(), t.String()) ...
e, err := c.opts.Leader.Elect(id)
...
}
```
**注意:** 我觉得 `Leader` 这个名字取得不好,此接口更适合命名为 `Elector`。
所以我们需要在初始化`Cron`时提供一个`Leader`。我们可以选用etcd进行服务发现, 同时用它来作领导权选举:
```go
import (
...
"github.com/micro/go-micro/sync/leader"
"github.com/micro/go-micro/sync/leader/etcd"
...
)
func main() {
// New Service
service := micro.NewService(
micro.Name("com.foo.cron.example"), // name the client service
)
// Initialise service
service.Init() // get etcd node list from registry
etcdList := service.Options().Registry.Options().Addrs
// build leader
lead := etcd.NewLeader(leader.Nodes(etcdList...))
cron := sync.NewCron(sync.WithLeader(lead))
cron.Schedule(
task.Schedule{Interval: 10 * time.Second},
task.Command{Name: "foo", Func: func() error {
log.Info("finish command foo")
return nil
}},
) if err := service.Run(); err != nil {
log.Fatal(err)
}
}
```
接下来在启动程序时传入相关参数:
```bash
go run main.go --registry=etcd --registry_address=etcd1.foo.com:2379,etcd2.foo.com:2379,etcd3.foo.com:2379
2020-03-07 09:13:41.916437 I | Transport [http] Listening on [::]:61329
2020-03-07 09:13:41.916541 I | Broker [http] Connected to [::]:61330
2020-03-07 09:13:41.916822 I | Registry [etcd] Registering node: com.foo.cron.example-caaa1f71-2559-431d-9139-df00324250a4
2020-03-07 09:14:13.846014 I | [cron] executing command foo
2020-03-07 09:14:13.846070 I | finish command foo
...
```
这样, 便在Micro中启动了一个计划任务。
**陷井:** Micro 的计划任务依赖etcd。
如果我们再额外启动两个节点(B和C),新节点上不会有计划任务执行, 因为此时最初的节点(A)是主节点。
然后我们可以把节点A关掉,几秒钟后 B或C中的一个将自动成为新的主节点,并开始执行任务。
这样我们达到了较理想的状态: 即没有单点故障,也没有重复执行。
### 改变任务执行的起始时间点
有时我们希望任务在未来一个固定的时间点启动, 这就需要修改`task.Schedule`的`Time`属性
```go
// start from the next New Year's Day
startPoint, _ := time.Parse("2006-01-01", "2021-01-01")
cron.Schedule(
task.Schedule{
Time: startPoint,
Interval: 10 * time.Second,
},
task.Command{Name: "foo", Func: func() error {
log.Info("finish command foo")
return nil
}},
)
```
它会按你预期执行。
如果你想让任务在服务启动后的等一段时间再开始运行, 你需要这么作:
```go
cron.Schedule(
task.Schedule{
Time: time.Now().Add(time.Minute),
Interval: 10 * time.Second,
},
task.Command{Name: "foo", Func: func() error {
log.Info("finish command foo")
return nil
}},
)
```
结果**会让你失望**, 多节点中又会出现重复执行问题。问题的原因存在于实现`sync.Cron` 和 `task.Schedule`的代码中。
```go
//cron.go
func (c *syncCron) Schedule(s task.Schedule, t task.Command) error {
id := fmt.Sprintf("%s-%s", s.String(), t.String()) ...
e, err := c.opts.Leader.Elect(id)
...
}
//task.go
func (s Schedule) String() string {
return fmt.Sprintf("%d-%d", s.Time.Unix(), s.Interval)
}
```
选举id依赖 `Schedule.String()`而 `Schedule.String()`依赖`Schedule.Time`。
当我们选择以服务启动时间为基准的相对时间时, 每个节点将生成各自不同的选举id。 因为它们很难恰巧完全在相同时间点启动。
由此,每个节点自己行成了一个选举组, 自己成为自己组的主节点。
**陷井**: 永远别用相对时间作为任务的初次启动时间。
------
## 结语
在分布式系统中正确地实现计划任务很不容易,不过好在Micro提供了一个简单优雅的方案。尽管它缺少crontab表达式之类的高级特性,这个方案是可以满足我们大部分应用场景的。
**不心点儿陷井,还是可用的**。这个建议既适用于计划任务这个功能, 也适用于整个Micro框架。
至此,本系列文章就结束了。 感谢阅读。
写这个系统对我来说也是个学习的过程。 学习下来,我觉得**Micro是个相当不错的工具, 它值得学习并且真的能帮我们极大地简化分布式系统开发**。
缺点是有的, 不过瑕不掩瑜。 并且它也在持续升级, 我写本系列还是基于v1.18.0 , 现在已经升到v2.2.0了。 我准备去试试新版的功能。**不过, 可别着急在生产环境上使用新版本**。你要是看了我之前的文章,我肯定知道为啥这么说。
以后我还会接着下Micro相关的文章,只是不再系列的形式。 每篇文章都会独立成文,讨论某些特定的话题。
要想在我发新文章时自动得到通知, 可以在Medium 和 Twitter上 Fo 我, 帐号都是 @dche423
回头聊。感谢????
— Dan Che!
有疑问加站长微信联系(非本文作者))