### 前提:
**1.job1优先级高于job2**
**2.job1和job2均需要1秒的执行时间**
**3.每500毫秒启动3个job2,每2秒启动1个job1**
### 结论:
**无论有多少job2,只要来了job1,就会优先执行job1**
```go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
// 定义1个阻塞主go程管道
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
// 定义2个job分发管道
ch1, ch2 := make(chan string), make(chan string)
job1 := "执行job1"
job2 := "执行job2"
// 模拟超时,ctx超时后停止启动任务和执行任务
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
// 启动worker函数,执行管道ch1和ch2收到的任务
go worker(ch1, ch2, ctx)
// 定义2个启动任务的周期定时器
tiker1, tiker2 := time.NewTicker(time.Second*2), time.NewTicker(time.Millisecond*500)
// 每隔500毫秒启动3个job2
go func() {
for {
select {
case <-tiker2.C:
log.Println("启动3个job2...")
for i := 0; i < 3; i++ {
go func() {
ch2 <- job2
}()
}
case <-ctx.Done():
log.Println("收到结束信号,停止启动job2!")
return
}
}
}()
// 每隔2秒启动1个job1
loop:
for {
select {
case <-ctx.Done():
log.Println("收到结束信号,停止启动job1!")
break loop
case <-tiker1.C:
log.Println("启动1个job1...")
go func() {
ch1 <- job1
}()
}
}
// 等待关闭信号
select {
case sig := <-ch:
log.Println("收到关闭信号:", sig)
}
log.Println("finish!")
}
// 按优先级处理任务函数
func worker(ch1, ch2 <-chan string, ctx context.Context) {
defer log.Println("defer close worker")
for {
// 第一个select如果先收到job1,则直接执行job1
select {
case <-ctx.Done():
log.Println("收到结束信号,退出worker函数!")
return
case job1 := <-ch1:
// 模拟job1需要1秒执行结束
log.Println(job1)
time.Sleep(time.Second)
case job2 := <-ch2:
// 第一个select如果先收到job2,则进入第2个select
// 判断此时是否存在未执行的job1,如果job1存在,则执行job1;如果job1不存在,则执行job2
priority:
for {
select {
case <-ctx.Done():
log.Println("执行job2被插队: 收到结束信号,退出worker函数!")
return
case job1 := <-ch1:
// 模拟job1需要1秒执行结束
log.Println("执行job2被插队:", job1)
time.Sleep(time.Second)
default:
break priority
}
}
// 模拟job2需要1秒执行结束
log.Println("此时没有job1:", job2)
time.Sleep(time.Second)
}
}
}
```
#### 日志:
``` go
2020/12/11 23:32:11 启动3个job2...
2020/12/11 23:32:11 此时没有job1: 执行job2
2020/12/11 23:32:12 启动3个job2...
2020/12/11 23:32:12 启动3个job2...
2020/12/11 23:32:12 此时没有job1: 执行job2
2020/12/11 23:32:13 启动1个job1...
2020/12/11 23:32:13 启动3个job2...
2020/12/11 23:32:13 启动3个job2...
2020/12/11 23:32:13 执行job2被插队: 执行job1
2020/12/11 23:32:14 启动3个job2...
2020/12/11 23:32:14 启动3个job2...
2020/12/11 23:32:14 此时没有job1: 执行job2
2020/12/11 23:32:15 启动1个job1...
2020/12/11 23:32:15 启动3个job2...
2020/12/11 23:32:15 启动3个job2...
2020/12/11 23:32:15 执行job2被插队: 执行job1
2020/12/11 23:32:16 启动3个job2...
2020/12/11 23:32:16 启动3个job2...
2020/12/11 23:32:16 此时没有job1: 执行job2
2020/12/11 23:32:17 启动1个job1...
2020/12/11 23:32:17 启动3个job2...
2020/12/11 23:32:17 启动3个job2...
2020/12/11 23:32:17 执行job2被插队: 执行job1
2020/12/11 23:32:18 启动3个job2...
2020/12/11 23:32:18 启动3个job2...
2020/12/11 23:32:18 此时没有job1: 执行job2
2020/12/11 23:32:19 启动1个job1...
2020/12/11 23:32:19 启动3个job2...
2020/12/11 23:32:19 启动3个job2...
2020/12/11 23:32:19 执行job2被插队: 执行job1
2020/12/11 23:32:20 启动3个job2...
2020/12/11 23:32:20 启动3个job2...
2020/12/11 23:32:20 此时没有job1: 执行job2
2020/12/11 23:32:21 启动1个job1...
2020/12/11 23:32:21 启动3个job2...
2020/12/11 23:32:21 收到结束信号,停止启动job1!
2020/12/11 23:32:21 收到结束信号,停止启动job2!
2020/12/11 23:32:21 收到结束信号,退出worker函数!
2020/12/11 23:32:21 defer close worker
2020/12/11 23:32:23 收到关闭信号: interrupt
2020/12/11 23:32:23 finish!
```
你好,请问哪里不安全了,麻烦指出来。 代码这里 https://gitee.com/helloh2o/lucky/blob/master/core/inet/broadcast.go
#3
更多评论
我有用过类似的情景,但我是这样写的。
```
for {
// 优先管理连接
select {
// add conn
case ic := <-bNode.addConnChan:
bNode.Connections[ic.GetUuid()] = ic
bNode.clientSize++
// conn leave
case key := <-bNode.delConnChan:
delete(bNode.Connections, key)
bNode.clientSize--
default:
select {
case pkg := <-bNode.onMessage:
if pkg == nil {
// stop Serve
_ = bNode.Destroy()
return
}
bNode.allMessages = append(bNode.allMessages, pkg)
bNode.broadcast(pkg)
default:
time.Sleep(time.Millisecond * 50)
}
}
}
```
#1