接入服务器和后端业务服务其维持tcp连接,多个前端请求通过接入服务器访问后端业务服务器,接入服务器可以方便增加路由功能,维护多个业务服务器,根据消息ID路由到具体的业务服务器。
项目目录如下
simplelotus
src
lotus
main.go
lotuslib
tcplotus.go
test
tcpclient.go
tcpserver.go
install
install源码如下:
#!/usr/bin/env bash if [ ! -f install ]; then echo 'install must be run within its container folder' 1>&2 exit 1 fi CURDIR=`pwd` OLDGOPATH="$GOPATH" export GOPATH="$CURDIR" gofmt -w src go install lotus export GOPATH="$OLDGOPATH" echo 'finished'
main.go
package main import ( "lotuslib" ) const ( ip = "0.0.0.0" port = 1987 ) func main() { tcplotus.TcpLotusMain(ip, port) }
tcplotus.go(和上游维持tcp连接)
package tcplotus import ( "encoding/json" "log" "net" "strconv" "time" ) const ( proxy_timeout = 5 proxy_server = "127.0.0.1:1988" msg_length = 1024 ) type Request struct { reqId int reqContent string rspChan chan<- string // writeonly chan } //store request map var requestMap map[int]*Request type Clienter struct { client net.Conn isAlive bool SendStr chan *Request RecvStr chan string } func (c *Clienter) Connect() bool { if c.isAlive { return true } else { var err error c.client, err = net.Dial("tcp", proxy_server) if err != nil { return false } c.isAlive = true log.Println("connect to " + proxy_server) } return true } //send msg to upstream server func ProxySendLoop(c *Clienter) { //store reqId and reqContent senddata := make(map[string]string) for { if !c.isAlive { time.Sleep(1 * time.Second) c.Connect() } if c.isAlive { req := <-c.SendStr //construct request json string senddata["reqId"] = strconv.Itoa(req.reqId) senddata["reqContent"] = req.reqContent sendjson, err := json.Marshal(senddata) if err != nil { continue } _, err = c.client.Write([]byte(sendjson)) if err != nil { c.RecvStr <- string("proxy server close...") c.client.Close() c.isAlive = false log.Println("disconnect from " + proxy_server) continue } //log.Println("Write to proxy server: " + string(sendjson)) } } } //recv msg from upstream server func ProxyRecvLoop(c *Clienter) { buf := make([]byte, msg_length) recvdata := make(map[string]string, 2) for { if !c.isAlive { time.Sleep(1 * time.Second) c.Connect() } if c.isAlive { n, err := c.client.Read(buf) if err != nil { c.client.Close() c.isAlive = false log.Println("disconnect from " + proxy_server) continue } //log.Println("Read from proxy server: " + string(buf[0:n])) if err := json.Unmarshal(buf[0:n], &recvdata); err == nil { reqidstr := recvdata["reqId"] if reqid, err := strconv.Atoi(reqidstr); err == nil { req, ok := requestMap[reqid] if !ok { continue } req.rspChan <- recvdata["resContent"] } continue } } } } //one handle per request func handle(conn *net.TCPConn, id int, tc *Clienter) { data := make([]byte, msg_length) handleProxy := make(chan string) request := &Request{reqId: id, rspChan: handleProxy} requestMap[id] = request for { n, err := conn.Read(data) if err != nil { log.Println("disconnect from " + conn.RemoteAddr().String()) conn.Close() delete(requestMap, id) return } request.reqContent = string(data[0:n]) //send to proxy select { case tc.SendStr <- request: case <-time.After(proxy_timeout * time.Second): //proxyChan <- &Request{cancel: true, reqId: id} _, err = conn.Write([]byte("proxy server send timeout.")) if err != nil { conn.Close() delete(requestMap, id) return } continue } //read from proxy select { case rspContent := <-handleProxy: _, err := conn.Write([]byte(rspContent)) if err != nil { conn.Close() delete(requestMap, id) return } case <-time.After(proxy_timeout * time.Second): _, err = conn.Write([]byte("proxy server recv timeout.")) if err != nil { conn.Close() delete(requestMap, id) return } continue } } } func TcpLotusMain(ip string, port int) { //start tcp server listen, err := net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP(ip), port, ""}) if err != nil { log.Fatalln("listen port error") return } log.Println("start tcp server " + ip + " " + strconv.Itoa(port)) defer listen.Close() //start proxy connect and loop var tc Clienter tc.SendStr = make(chan *Request, 1000) tc.RecvStr = make(chan string) tc.Connect() go ProxySendLoop(&tc) go ProxyRecvLoop(&tc) //listen new request requestMap = make(map[int]*Request) var id int = 0 for { conn, err := listen.AcceptTCP() if err != nil { log.Println("receive connection failed") continue } id++ log.Println("connected from " + conn.RemoteAddr().String()) go handle(conn, id, &tc) } }
测试代码如下:
tcpserver.go
package main import ( "encoding/json" "fmt" "net" ) const ( msg_length = 1024 ) func Echo(c net.Conn) { data := make([]byte, msg_length) defer c.Close() var recvdata map[string]string recvdata = make(map[string]string, 2) var senddata map[string]string senddata = make(map[string]string, 2) for { n, err := c.Read(data) if err != nil { fmt.Printf("read message from lotus failed") return } if err := json.Unmarshal(data[0:n], &recvdata); err == nil { senddata["reqId"] = recvdata["reqId"] senddata["resContent"] = "Hello " + recvdata["reqContent"] sendjson, err := json.Marshal(senddata) _, err = c.Write([]byte(sendjson)) if err != nil { fmt.Printf("disconnect from lotus server") return } } } } func main() { fmt.Printf("Server is ready...\n") l, err := net.Listen("tcp", ":1988") if err != nil { fmt.Printf("Failure to listen: %s\n", err.Error()) } for { if c, err := l.Accept(); err == nil { go Echo(c) //new thread } } }
tcpclient.go
package main import ( "bufio" "fmt" "net" "os" "time" ) type Clienter struct { client net.Conn isAlive bool SendStr chan string RecvStr chan string } func (c *Clienter) Connect() bool { if c.isAlive { return true } else { var err error c.client, err = net.Dial("tcp", "127.0.0.1:1987") if err != nil { fmt.Printf("Failure to connet:%s\n", err.Error()) return false } c.isAlive = true } return true } func (c *Clienter) Echo() { line := <-c.SendStr c.client.Write([]byte(line)) buf := make([]byte, 1024) n, err := c.client.Read(buf) if err != nil { c.RecvStr <- string("Server close...") c.client.Close() c.isAlive = false return } time.Sleep(1 * time.Second) c.RecvStr <- string(buf[0:n]) } func Work(tc *Clienter) { if !tc.isAlive { if tc.Connect() { tc.Echo() } else { <-tc.SendStr tc.RecvStr <- string("Server close...") } } else { tc.Echo() } } func main() { var tc Clienter tc.SendStr = make(chan string) tc.RecvStr = make(chan string) if !tc.Connect() { return } r := bufio.NewReader(os.Stdin) for { switch line, ok := r.ReadString('\n'); true { case ok != nil: fmt.Printf("bye bye!\n") return default: go Work(&tc) tc.SendStr <- line s := <-tc.RecvStr fmt.Printf("back:%s\n", s) } } }
有疑问加站长微信联系(非本文作者)