摘自Go语言实战P166第7章
runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用 runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。这个程序可能会作为 cron 作业执行,或者在基于定时任务的云环境(如 iron.io)里执行。
//main.go
// 这个示例程序演示如何使用通道来监视
// 程序运行的时间,以在程序运行时间过长
// 时如何终止程序
package main
import (
"log"
"os"
"time"
"github.com/goinaction/code/chapter7/patterns/runner"
)
// timeout 规定了必须在多少秒内处理完成
const timeout = 3 * time.Second
// main 是程序的入口
func main() {
log.Println("Starting work.")
// 为本次执行分配超时时间
r := runner.New(timeout)
// 加入要执行的任务
r.Add(createTask(), createTask(), createTask())
// 执行任务并处理结果
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
// createTask 返回一个根据 id
// 休眠指定秒数的示例任务
// 用来模拟正在进行工作
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
//runner.go
// Gabriel Aszalos 协助完成了这个示例
// runner 包管理处理任务的运行和生命周期
package runner
import (
"errors"
"os"
"os/signal"
"time"
)
// Runner 在给定的超时时间内执行一组任务,
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {
// interrupt 通道报告从操作系统
// 发送的信号
interrupt chan os.Signal
// complete 通道报告处理任务已经完成
complete chan error
// timeout 报告处理任务已经超时
timeout <-chan time.Time
// tasks 持有一组以索引顺序依次执行的
// 函数
tasks []func(int)
}
// ErrTimeout 会在任务执行超时时返回
var ErrTimeout = errors.New("received timeout")
// ErrInterrupt 会在接收到操作系统的事件时返回
var ErrInterrupt = errors.New("received interrupt")
// New 返回一个新的准备使用的 Runner
// task 字段的零值是 nil,已经满足初始化的要求
// ,所以没有被明确初始化
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
// Add 将一个任务附加到 Runner 上。这个任务是一个
// 接收一个 int 类型的 ID 作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
// Start 执行所有任务,并监视通道事件
func (r *Runner) Start() error {
// 我们希望接收所有中断信号
signal.Notify(r.interrupt, os.Interrupt)
// 用不同的 goroutine 执行不同的任务
go func() {
r.complete <- r.run()
}()
select {
// 当任务处理完成时发出的信号
case err := <-r.complete:
return err
// 当任务处理程序运行超时时发出的信号
case <-r.timeout:
return ErrTimeout
}
}
// run 执行每一个已注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
// 检测操作系统的中断信号
if r.gotInterrupt() {
return ErrInterrupt
}
// 执行已注册的任务
task(id)
}
return nil
}
// gotInterrupt 验证是否接收到了中断信号
func (r *Runner) gotInterrupt() bool {
select {
// 当中断事件被触发时发出的信号
case <-r.interrupt:
// 停止接收后续的任何信号
signal.Stop(r.interrupt)
return true
// 继续正常运行
default:
return false
}
}
1.通道 interrupt 被初始化为缓冲区容量为 1 的通道。这可以保证通道至少能接收一个来自语言运行时的 os.Signal 值,确保语言运行时发送这个事件的时候不会被阻塞。如果 goroutine没有准备好接收这个值,这个值就会被丢弃。例如,如果用户反复敲 Ctrl+C 组合键,程序只会在这个通道的缓冲区可用的时候接收事件,其余的所有事件都会被丢弃。
2.通道 complete 被初始化为无缓冲的通道。当执行任务的 goroutine 完成时,会向这个通道发送一个 error 类型的值或者 nil 值。之后就会等待 main 函数接收这个值。一旦 main 接收了这个 error 值,goroutine 就可以安全地终止了。
3.最后一个通道 timeout 是用 time 包的 After 函数初始化的。After 函数返回一个time.Time 类型的通道。语言运行时会在指定的 duration 时间到期之后,向这个通道发送一个 time.Time 的值。
4.select default
87 // gotInterrupt 验证是否接收到了中断信号
88 func (r *Runner) gotInterrupt() bool {
89 select {
90 // 当中断事件被触发时发出的信号
91 case <-r.interrupt:
92 // 停止接收后续的任何信号
93 signal.Stop(r.interrupt)
95 return true
96
97 // 继续正常运行
98 default:
99 return false
100 }
101 }
在第 91 行,代码试图从 interrupt 通道去接收信号。一般来说,select 语句在没有任何要接收的数据时会阻塞,不过有了第 98 行的 default 分支就不会阻塞了。default 分支会将接收 interrupt 通道的阻塞调用转变为非阻塞的。如果 interrupt 通道有中断信号需要接收,就会接收并处理这个中断。如果没有需要接收的信号,就会执行 default 分支。当收到中断信号后,代码会通过在第 93 行调用 Stop 方法来停止接收之后的所有事件。之后函数返回 true。如果没有收到中断信号,在第 99 行该方法会返回 false。本质上,gotInterrupt 方法会让 goroutine 检查中断信号,如果没有发出中断信号,就继续处理工作。
有疑问加站长微信联系(非本文作者)