package main
import (
"fmt"
"net"
"time"
"os"
"os/signal"
"sync"
)
const (
WorkerCount = 2
)
type Task struct {
Id int32
Message string
}
var wg sync.WaitGroup
var taskChannel = make(chan Task)
var signChannel = make(chan os.Signal, 1)
var exitChanel = make(chan int)
func main() {
go installSign()
tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:2202")
if err != nil {
panic("解析ip地址失败: " + err.Error())
}
fmt.Println("Listening 127.0.0.1:2202 ....")
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
panic("监听TCP失败: " + err.Error())
}
fmt.Println("Listen success on 127.0.0.1:2202 with tcp4")
defer func() {
fmt.Println("Close listenning ....")
listener.Close()
fmt.Println("Shutdown")
}()
connChannel := make(chan net.Conn)
go accept(listener, connChannel)
go handleConn(connChannel)
go taskDispatch()
for {
select {
case <- signChannel:
fmt.Println("Get shutdown sign")
go notifyGoroutingExit()
goto EXIT
}
}
EXIT:
fmt.Println("Waiting gorouting exit ....")
wg.Wait()
}
func accept(listener * net.TCPListener, connChannel chan net.Conn) {
for {
connection, err := listener.AcceptTCP()
if err != nil {
fmt.Println("Accept 失败: " + err.Error())
} else {
connChannel <- connection
}
}
}
func handleConn(connChannel chan net.Conn) {
fmt.Println("Wating connection ....")
for {
select {
case conn := <- connChannel:
remoteAddr := conn.RemoteAddr()
fmt.Println("Client " + remoteAddr.String() + " connected")
readConn(&conn)
}
}
}
func readConn(conn *net.Conn) {
for {
(*conn).SetReadDeadline(time.Now().Add(5 * time.Second))
buf := make([]byte, 1024)
_, err := (*conn).Read(buf)
if err != nil {
fmt.Println("Read connection error: " + err.Error())
if err.Error() == "EOF" {
(*conn).Close();
fmt.Println("Close connection " + (*conn).RemoteAddr().String())
break
}
}
if buf != nil {
fmt.Printf("Read message from connect: %s", string(buf))
writeConn(conn, buf)
var task Task
task.Id = 1
task.Message = string(buf)
taskChannel <- task
}
}
}
func writeConn(conn *net.Conn, msg []byte) {
_, err := (*conn).Write(msg)
if err != nil {
fmt.Println("Write connection error: " + err.Error())
if err.Error() == "EOF" {
(*conn).Close();
fmt.Println("Close connection " + (*conn).RemoteAddr().String())
}
}
}
func taskDispatch() {
fmt.Println("Init task moniter ....")
for i := 0; i < WorkerCount; i ++ {
go loop()
}
fmt.Println("Init task moniter DONE!")
}
func loop() {
ticker := time.NewTicker(10 * time.Second)
wg.Add(1)
defer func() {
defer wg.Done()
defer ticker.Stop()
}()
for {
fmt.Println("Wating task ....")
select {
case task := <- taskChannel:
fmt.Println("Task comming: " + task.Message)
break;
case <- exitChanel:
fmt.Println("Woker get exit sign")
goto STOP
//default:
}
// Epoll, 去读任务数据, 不需要处理超时的情况
//select {
// case <- ticker.C:
// fmt.Println(time.Now().String() + " No task after 10 second")
// break;
//}
}
STOP:
//TODO: Clear undo task
}
func installSign() {
signal.Notify(signChannel, os.Interrupt, os.Kill)
}
func notifyGoroutingExit() {
for i := 0; i < WorkerCount; i ++ {
exitChanel <- 1
}
}
脑洞清奇,人家都是每个链接开一个go, 你倒好,所有链接的读取都放到 一个 readConn 里面, 假设100个链接有2~3个 网络不好,的那所有链接的读取都得等待 这2~3个高延时的链接了。
#1