开了goroutine如何才能退出呢?如何控制它并了解其生命周期?这里包括主动退出和外部的被动退出
一. 如何退出goroutine?
1. 主动自己退出:
- 经过多少时间后主动退出 (channel)
func main() {
var wg sync.WaitGroup
quit := time.Tick(time.Second * 2) // 给个定时器
wg.Add(1)
go func() {
defer wg.Done()
<-quit
fmt.Println("over goroutine")
return
}()
wg.Wait()
fmt.Println("over all")
}
- 通过context通知goroutine退出:
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
fmt.Println("over goroutine")
return
}()
wg.Wait()
fmt.Println("over all")
}
2. 外部被动退出:
我们可以通过外部操作接收信号方式来退出:
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
fmt.Println("over")
return
}()
wg.Wait()
}
二. goroutine的生命周期之runner并发模式
ruuner 用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用 runner 包来终止程序。
当开发者需要调度后台吹了任务的程序的时候,这种模式会很有用。这个程序可能会作为 cron 作业执行,或者在基于定时任务的云环境(iron.io)里执行。
在设计上,可以实现以下几点:
程序可以在分配的时间内完成工作,正常终止;
程序没有及时完成工作,“自杀”;
接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作。
完整代码实现如下:
//统一错误处理
//超时错误信息,会在任务执行超时时返回
var ErrTimeOut = errors.New("received timeout")
//中断错误信号,会在接收到操作系统的事件时返回
var ErrInterrupt = errors.New("received interrupt")
// Runner 在给定的超时时间内执行一组任务
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {
//从操作系统发送信号
interrupt chan os.Signal
//报告处理任务已完成
complete chan error
//报告处理任务已经超时
timeout <- chan time.Time
//持有一组以索引为顺序的依次执行的以int类型id为参数的函数
tasks [] func(id int)
}
// New 函数返回一个新的准备使用的Runner,d:自定义分配的时间
// 初始化结构体参数
func New(d time.Duration) *Runner{
return &Runner{
interrupt:make(chan os.Signal,1),
complete:make(chan error),
//会在另一线程经过时间段d后向返回值发送当时的时间。
// 因为 task 字段的零值是 nil,已经满足初始化的要求,所以没有被明确初始化。
timeout:time.After(d),
}
}
//将一个任务加入到Runner中
func (r *Runner)Add(tasks ...func(id int)) {
r.tasks = append(r.tasks,tasks...)
}
//开始执行所有任务,并监控通道事件
func (r *Runner)Start()error {
//监控所有的中断信号
signal.Notify(r.interrupt,os.Interrupt)
//使用不同的goroutine执行不同的任务
go func() {
r.complete <- r.run()
}()
//使用 select 语句来监控goroutine的通信
select {
//等待任务完成
case err := <- r.complete:
return err
//任务超时
case <- r.timeout:
return ErrTimeOut
}
}
//执行每一个已注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
//检测操作系统的中断信号
if r.gotInterrupt() {
return ErrInterrupt
}
//执行已注册的任务
task(id)
}
return nil
}
//检测是否收到了中断信号
func (r *Runner)gotInterrupt()bool {
select {
//当中断事件被触发时
case <- r.interrupt:
//停止接收后续的任何信号
signal.Stop(r.interrupt)
return true
//继续执行
default:
return false
}
}
main.go演示Runner测试
// timeout 规定了必须在多少秒内处理完成
const timeout = 3 * time.Second
func main() {
log.Println("Starting work.")
// 为本次执行分配超时时间
r := New(timeout)
// 加入要执行的任务
r.Add(createTask(), createTask(), createTask())
// 执行任务并处理结果
if err := r.Start(); err != nil {
switch err {
case ErrTimeOut:
log.Println("Terminating due to timeout.")
os.Exit(1)
case ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
//createTask 返回一个根据 id 休眠指定秒数的示例任务
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
有疑问加站长微信联系(非本文作者)