Micro In Action(尾声): 分布式计划任务

polaris · 2020-03-28 15:53:57 · 2246 次点击 · 预计阅读时间 10 分钟 · 大约8小时之前 开始浏览    
这是一个创建于 2020-03-28 15:53:57 的文章,其中的信息可能已经有所发展或是发生改变。

Micro In Action(尾声): 分布式计划任务

Micro In Action

本文作者:Che Dan

原文链接:https://medium.com/@dche423/micro-in-action-9-cron-job-dabec09058e1

本文是《Micro In Acton》 系列的最后一篇。

我们在前面的文章中已经探讨过了利于Micro创建和使用微服务的方方面面。 今天到了最后一个话题: 计划任务。

要作好发布式计划任务不容易

基本上每个系统都需要计划任务。 它们用来执行一些后台任务, 有的是在特点时间点运行一次, 有的是定期重复执行。

如果系统里只有一个节点, 那么事情非常简单直接。我们把Web服务和后台程序都放到这个节点上, 只要这个节点是正常的, 那么后台程序就可以顺利执行。

8-1.png

但是当今的系统很少是单节点就可以搞定的, 我们需要考虑系统的鲁棒性和可扩展性。当系统的节点数是一个以后, 我们就不得不面对两个互相矛盾的问题: 重复执行单点故障

8-2.png

如果我们不改代码,只是往系统中填加更多完全相同的副本节点,如上图所示,必然会导致重复执行(因为每个节点中包含相同的后台程序)。要避免这些重复, 我们需要写代码去小心处理顺序执行逻辑,这很容易出错。

有些“聪明人”会调整架构。 他们把计划任务要执行的逻辑并入到Web服务中,用HTTP API 或gRPC API 把这些功能暴露出来。然后用操作系统的计划任务(例如crontab)来调用这些API,完成同样的功能。

8-3.png

这个架构的好处是结构简单,并且可以避免重复执行。 操作系统触发计划任务把请求发给负载均衡, 然后负载均衡会把请求转到集群中的某一个节点。

但它还是无法避免单点故障。 如果运行计划任务的那台服务器出了问题, 任务还是无法执行。

还有一些其它的替代方法, 大多无法同时解决前面提到的两个问题。

比如为后台程序单独部署一个节点, 仍存在单点故障问题。

8-4.png

比如为后台程序部署多个节点,仍会发生重复执行。

8-5.png

所以应该怎么解决这个困难吗? 说到底,我们是遇到了分布式计算中一个经典问题: 领导权选举问题

我们希望保持所有的服务节点都是完全相同(至少大部分时间如此)并无状态的。 这样就可以保证整体系统可以方便地横向扩展。但有些时间 , 我们必须在这些对等节点中选出一个主节点, 用于执行少量关键任务(例如计划任务)。

分布式选举的方案并不需要自己从头实现, 很多服务发现系统都内置提供这种能力, 例如etcd, zookeeper 等等都有此功能。

有了分布式选举,就可以保证我们的计划任务只在单一的主节点上运行(因此避免了重复执行)。而如果当前主节点因为某种原因宕机了, 其它节点会自动共同选举出新的主节点,执行后续的任务。(因为没有单点故障)

8-6.png

这其实就是Micro中处理计划任务所采用的方法。


Micro 中的计划任务

Micro 通过github.com/micro/go-micro/sync 这个包提供了计划任务的功能。

不过, 与其它很多Micro功能一样,这是个“隐藏”功能。 没文档,没示例也没有测试用例可以参考。自己看代码学习,使用后果自负????.

关于这个功能, 本文可能是目前全网唯一一份资料

关键组件

此功能由几个接口与类组成。

其中最重要的是 sync.Cron接口:

// Cron is a distributed scheduler using leader election
// and distributed task runners. It uses the leader and
// task interfaces.
type Cron interface {
   Schedule(task.Schedule, task.Command) error
}

这个接口只包含一个方法Schedule,它接受俩参数。

第一个参数 task.Schedule 代表任务的时间计划。

// Schedule represents a time or interval at which a task should run
type Schedule struct {
   // When to start the schedule. Zero time means immediately
   Time time.Time
   // Non zero interval dictates an ongoing schedule
   Interval time.Duration
}

这个类有两个属性, 一是任务执行的起始时间点, 另一个重是执行间隔间事。

方法的第二个参数 task.Command 定义了任务要执行的命令, 包含了名称和一个函数指针(指定具体的执行代码)。

// Command to be executed
type Command struct {
   Name string
   Func func() error
}

设置计划任务

了解了上面的组件,可以很容易地创建并设置一个计划任务。

package mainimport (
   "time"   "github.com/micro/go-micro"
   "github.com/micro/go-micro/sync"
   "github.com/micro/go-micro/sync/task"
   "github.com/micro/go-micro/util/log"
)
func main() {
   // New Service
   service := micro.NewService(
      micro.Name("com.foo.cron.example"), // name the client service
   )
   // Initialise service
   service.Init()   cron := sync.NewCron()
   cron.Schedule(
      task.Schedule{Interval: 10 * time.Second},
      task.Command{Name: "foo", Func: func() error {
         log.Debug("finish command foo")
         return nil
      }},
   )
   if err := service.Run(); err != nil {
      log.Fatal(err)
   }
}

像往常一样, 先创建并初始化一个service,然后用sync.NewCron()创建一个Cron接口。然后用cron.Schedule创建设置一个简单地任务,它会每隔10秒在控制台输出一行日志。

下面把程序跑起来:

go run main.go2020-03-06 16:47:43.696426 I | Transport [http] Listening on [::]:62880
2020-03-06 16:47:43.696518 I | Broker [http] Connected to [::]:62882
2020-03-06 16:47:43.696808 I | Registry [mdns] Registering node: com.foo.cron.example-eb3a033f-4a33-438e-a467-6d2978027f2b

你可能预期每隔10秒会看到一些日志输出,但结果是啥也没有。不报错,也没有日志。 为啥呢?

我们知道Micro有个约定: 所有关键组件都有内置的轻量组实现。 比如有基于mDNS的注册组件, 有基于HTTP的Broker实现。 有了他们,我们可以不安装任何外部依赖,在本地完成开发。

但计划任务这个功能打破了上述约定,想使用它, 必须依赖etcd。看看sync.NewCron的源码便知道是怎么回事儿了:

func NewCron(opts ...Option) Cron {
   ...
   if options.Leader == nil {
      options.Leader = etcd.NewLeader()
   }
   ...
}

如果调用时没有提供Leader选项(它的类型为leader.Leader),那么会创建一个默认的(etcd.NewLeader)。就是这一行使得我们必须依赖etcd。如果再进一步看看etcd.NewLeader的代码, 就能找到根源;

func NewLeader(opts ...leader.Option) leader.Leader {
   ...
   if len(endpoints) == 0 {
      endpoints = []string{"http://127.0.0.1:2379"}
   }
   ...
}

因为我们没有etcd在 127.0.0.1:2379 地址上服务, 这个任务将永远卡在那儿,关键还不报错

那么leader.Leader是什么? 我们为什么需要它呢?先看看他的定义:

// Leader provides leadership election
type Leader interface {
   // elect leader
   Elect(id string, opts ...ElectOption) (Elected, error)
   // follow the leader
   Follow() chan string
}

本文上一节提到了, 要在分布式环境中成功执行计划任务, 我们需需有领导权选举的能力,而leader.Leader就是用来作这个的。在sync.Cron 的内部实现中,会调用此接口的Elect方法来获取领导。

func (c *syncCron) Schedule(s task.Schedule, t task.Command) error {
   id := fmt.Sprintf("%s-%s", s.String(), t.String())   ...
         e, err := c.opts.Leader.Elect(id)
   ...
}

注意: 我觉得 Leader 这个名字取得不好,此接口更适合命名为 Elector

所以我们需要在初始化Cron时提供一个Leader。我们可以选用etcd进行服务发现, 同时用它来作领导权选举:

import (
   ...
   "github.com/micro/go-micro/sync/leader"
   "github.com/micro/go-micro/sync/leader/etcd"
   ...
)
func main() {
   // New Service
   service := micro.NewService(
      micro.Name("com.foo.cron.example"), // name the client service
   )
   // Initialise service
   service.Init()   // get etcd node list from registry
   etcdList := service.Options().Registry.Options().Addrs
   // build leader
   lead := etcd.NewLeader(leader.Nodes(etcdList...))

   cron := sync.NewCron(sync.WithLeader(lead))
   cron.Schedule(
      task.Schedule{Interval: 10 * time.Second},
      task.Command{Name: "foo", Func: func() error {
         log.Info("finish command foo")
         return nil
      }},
   )   if err := service.Run(); err != nil {
      log.Fatal(err)
   }
}

接下来在启动程序时传入相关参数:

go run main.go --registry=etcd --registry_address=etcd1.foo.com:2379,etcd2.foo.com:2379,etcd3.foo.com:2379
2020-03-07 09:13:41.916437 I | Transport [http] Listening on [::]:61329
2020-03-07 09:13:41.916541 I | Broker [http] Connected to [::]:61330
2020-03-07 09:13:41.916822 I | Registry [etcd] Registering node: com.foo.cron.example-caaa1f71-2559-431d-9139-df00324250a4
2020-03-07 09:14:13.846014 I | [cron] executing command foo
2020-03-07 09:14:13.846070 I | finish command foo
...

这样, 便在Micro中启动了一个计划任务。

陷井: Micro 的计划任务依赖etcd。

如果我们再额外启动两个节点(B和C),新节点上不会有计划任务执行, 因为此时最初的节点(A)是主节点。

然后我们可以把节点A关掉,几秒钟后 B或C中的一个将自动成为新的主节点,并开始执行任务。

这样我们达到了较理想的状态: 即没有单点故障,也没有重复执行。

改变任务执行的起始时间点

有时我们希望任务在未来一个固定的时间点启动, 这就需要修改task.ScheduleTime属性

// start from the next New Year's Day
startPoint, _ := time.Parse("2006-01-01", "2021-01-01")
cron.Schedule(
   task.Schedule{
      Time:     startPoint,
      Interval: 10 * time.Second,
   },
   task.Command{Name: "foo", Func: func() error {
      log.Info("finish command foo")
      return nil
   }},
)

它会按你预期执行。

如果你想让任务在服务启动后的等一段时间再开始运行, 你需要这么作:

cron.Schedule(
   task.Schedule{
      Time:     time.Now().Add(time.Minute),
      Interval: 10 * time.Second,
   },
   task.Command{Name: "foo", Func: func() error {
      log.Info("finish command foo")
      return nil
   }},
)

结果会让你失望, 多节点中又会出现重复执行问题。问题的原因存在于实现sync.Crontask.Schedule的代码中。

//cron.go
func (c *syncCron) Schedule(s task.Schedule, t task.Command) error {
   id := fmt.Sprintf("%s-%s", s.String(), t.String())   ...
         e, err := c.opts.Leader.Elect(id)
   ...
}
//task.go
func (s Schedule) String() string {
   return fmt.Sprintf("%d-%d", s.Time.Unix(), s.Interval)
}

选举id依赖 Schedule.String()Schedule.String()依赖Schedule.Time

当我们选择以服务启动时间为基准的相对时间时, 每个节点将生成各自不同的选举id。 因为它们很难恰巧完全在相同时间点启动。

由此,每个节点自己行成了一个选举组, 自己成为自己组的主节点。

陷井: 永远别用相对时间作为任务的初次启动时间。


结语

在分布式系统中正确地实现计划任务很不容易,不过好在Micro提供了一个简单优雅的方案。尽管它缺少crontab表达式之类的高级特性,这个方案是可以满足我们大部分应用场景的。

不心点儿陷井,还是可用的。这个建议既适用于计划任务这个功能, 也适用于整个Micro框架。

至此,本系列文章就结束了。 感谢阅读。

写这个系统对我来说也是个学习的过程。 学习下来,我觉得Micro是个相当不错的工具, 它值得学习并且真的能帮我们极大地简化分布式系统开发

缺点是有的, 不过瑕不掩瑜。 并且它也在持续升级, 我写本系列还是基于v1.18.0 , 现在已经升到v2.2.0了。 我准备去试试新版的功能。不过, 可别着急在生产环境上使用新版本。你要是看了我之前的文章,我肯定知道为啥这么说。

以后我还会接着下Micro相关的文章,只是不再系列的形式。 每篇文章都会独立成文,讨论某些特定的话题。

要想在我发新文章时自动得到通知, 可以在Medium 和 Twitter上 Fo 我, 帐号都是 @dche423

回头聊。感谢????

— Dan Che!


有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2246 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
上一篇:leetcode_820
1 回复  |  直到 2020-04-27 20:22:43
louv
louv · #1 · 5年之前

2.5 又把 cron 功能去掉了。。

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传