Golang实现并发聊天室

Pyvago · · 1193 次点击 · · 开始浏览    
## 前言以及项目简介 Golang是一门极为出色的语言,近些年也越发火热。每一种语言火起来都有它的道理,Golang就是以其独树一帜的并发处理和高性能博得了众多程序员的青睐,不少的C/C++、Java、PHP开发者都已经或逐渐转型扑向Go语言的怀抱。 从当初刚刚接触Go到现在大概有25天了,一直在看某马的培训视频,也确实学到了不少东西。这个并发聊天室就是他们GO语言与区块链就业班的**阶段性学习项目**。该项目处于整个课程中的第二阶段——并发编程与网络编程,这个并发聊天室就作为此阶段的收尾项目,可见其不容小觑的学习意义和价值。 整个项目的讲解视频共有12节,平均每节十几分钟。我只是看了前三节,包括这个聊天室的功能简介,老师会把项目完整运行一遍,给大家展示其具备的所有功能,还有这个项目的核心部分——并发处理机制,当时我就是在这里反复看了好几遍。然后我就开始做这个项目了,也没去看他给的源码里所有模块的具体实现。感觉有了大体的方向以后我就可以先尝试自己走,到项目这里跟老师跟太紧的话容易给自己弄晕。 历时不到两天也算是把这个项目做完了,我做的时候给分成了两个阶段,第一阶段完成了核心功能:用户进入聊天室后把用户进入的信息广播发送给其他用户,还有广播每位用户发送的信息。这一阶段参照了老师给的源码里manager()这个函数;第二阶段就是剩下的功能模块,像用户改名、用户退出、超时处理这些,在完成超时处理的时候参照了源码里select监听超时这段代码块的位置。 所以说这个项目基本全是我一个字一个字码上去的,至于老师给的源码我也只是参照了一小部分,到现在我也没运行过老师给的源码,而且老师的课程和源码都只有服务器的,没有客户端,我是都做出来了的。本文后面会把我的源码和老师给的源码都贴出来。 ## 门槛 然后说一下这个并发聊天室项目的门槛,或者说它适合什么样水平的人学习。整个项目涉及到的知识有:分支、循环、函数、map、结构体、并发编程、网络编程、select超时处理等等。项目虽然不大,但是涵盖了不少的基础知识,所以非常适合刚看完一本入门书、学完基础的人拿过来练手。至于大佬可以多批评,提提建议,或者直接无视。 ## 项目演示截图 先给大家看一下这个聊天室运行起来后是什么样子,大致有个形象地认识。 视频中的演示截图: ![在这里插入图片描述](https://img-blog.csdnimg.cn/20191220145547120.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21pbmdtaW5nMDUwOA==,size_16,color_FFFFFF,t_70) 视频里的演示就是这个样子的,是不是看起来一点都不像聊天室?哈哈哈哈哈哈,接下来看看**我做的聊天室**: 我演示的时候同时开了三个客户端,用户名分别是用的马化腾、马云、周鸿祎,截图的时候只截了马化腾和周鸿祎的,这些已经能够完整地展示出聊天室支持的各个功能。 马化腾的客户端: ![在这里插入图片描述](https://img-blog.csdnimg.cn/2019122015115090.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21pbmdtaW5nMDUwOA==,size_16,color_FFFFFF,t_70) ![在这里插入图片描述](https://cdn.learnku.com/uploads/images/201912/20/54769/gNg8PsIYUJ.png!large) ![在这里插入图片描述](https://img-blog.csdnimg.cn/20191220151040508.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21pbmdtaW5nMDUwOA==,size_16,color_FFFFFF,t_70) 周鸿祎的客户端: ![在这里插入图片描述](https://img-blog.csdnimg.cn/20191220150744310.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21pbmdtaW5nMDUwOA==,size_16,color_FFFFFF,t_70) 我的后台服务器: ![在这里插入图片描述](https://cdn.learnku.com/uploads/images/201912/20/54769/xcABCSlRIa.png!large) 可以看到我的聊天室对于交互的友好性做了很多努力,加入了一些提示字符,让整个聊天室结构显得更清晰了,后续学完前端还可以进一步完善。需要注意的是最好在Goland-IDE运行我的代码,如果在cmd命令行里运行的话,那些符号都会产生乱码,无法识别。源码会在本文后面给出。 ## 项目流程图 有个逻辑清晰、结构明了的流程图能帮助我们省去很多不必要的麻烦,不然直接开始撸代码的话可能会造成思维混乱,跳不出来。 先来看看培训班给的流程图: ![在这里插入图片描述](https://img-blog.csdnimg.cn/20191220152411966.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L21pbmdtaW5nMDUwOA==,size_16,color_FFFFFF,t_70) ![在这里插入图片描述](https://cdn.learnku.com/uploads/images/201912/20/54769/J4bVrtiP3x.png!large) 然后是我的流程图: ![在这里插入图片描述](https://cdn.learnku.com/uploads/images/201912/20/54769/l7nZiuvedg.png!large) ![在这里插入图片描述](https://cdn.learnku.com/uploads/images/201912/20/54769/nTzHjbTalJ.png!large) 其实对于老师给的流程图我基本上没怎么细看,那就讲讲我自己的流程图吧,第一张图基本上是所有TCP数据传输的整体流程,没什么好说的,学完TCP都应该掌握,只有那个manager函数是这个项目里独有的。第二张图先从客户端开始看,因为服务器在循环监听着通信套接字,客户端是主动发送消息的一端,客户端的消息对应图上绿色的小对话框,服务器收到这个消息后经过一系列处理变成黄色的小对话框,随后经过三个channel又被返回给了客户端。这就是并发聊天室的核心——并发处理流程。 ## 各个功能模块详细说明 ### 全局变量以及关键函数 1. var message = make(chan []byte); 这是一个无缓冲channel,所有客户端发到服务器的消息经过处理后都会被写入这个channel 2. manager()函数,这是我们项目里的管家,专门负责监听着全局channel——message的读端,一旦有消息写进来,manager就负责把消息广播给所有在线用户。 ```go //管家循环监听管道message func manager() { for { select { case msg := <-message: for _, v := range onlineUsers { v.C <- msg } } } } ``` 3. 存储用户信息的结构体,其中C专门处理用户发送的消息,NewUser专门处理用户进入聊天室和退出聊天室的信息 ```go //管家循环监听管道message type userInfo struct { name string C chan []byte NewUser chan []byte //用于广播用户进入或退出当前聊天室的信息 } ``` 4. var onlineUsers = make(map[string]userInfo) 这个map用于存储所有的在线用户,键是该用户的ip+port,值就是第三条说的结构体 5. func HandleConnect(conn net.Conn) 这是专门用于处理服务器与单独客户端之间读写的函数它和accept函数都被放在一个死循环里,accept一旦受到客户端连接请求并连接成功,就会启动go程去执行这个函数,与此同时主go程就循环回去继续阻塞在accept函数,监听着其他的客户端连接请求。 ### 广播用户上线 客户端连接成功后,我会要求他先输入一个用户名,然后客户端把这个用户名使用write发送给服务器,服务器收到这个用户名以后就把他存到全局map里面,把用户名拿来再加上一些提示信息发送给每位用户的channel——NewUser,然后立刻启动go程去监听每个用户自己的channel,一旦有消息写进来,服务器就把从channel里读出来的用户上线信息用write发送给客户端。 ### 广播用户消息 与之前处理广播用户上线类似, ```go msg = append([]byte("💬["+thisUser+"]对大家说:"), buf2[:n]...) ``` 但是这里thisUser不能和上面的读到的用户名一致了,当时在这个问题上困扰了好久。就是说现在面临的问题是:当多个客户端同时连接到服务器,此时服务器如何判断读到的信息来自哪个客户端?我的答案是:当服务器读到客户端发来的消息后,立刻调用conn.RemoteAddr()即可获取当前发送消息的客户端的地址。 ### 展示在线用户名 同样在服务器循环阻塞监听读取客户端消息的后面,加上一个switch分支结构,如果读来的消息和“who”相等的话,就用for遍历一下全局的map ### 修改用户名 指定用户修改用户名的方法是“rename|”+新用户名,只需在上一条的switch分支里加上case,消息的前7个字符和“rename|”相等的话,就执行后续改名的代码。需要注意的是,这里处理完成后紧接着还要在广播用户消息的地方改一下,不然服务器会把“who”、“rename|”这些东西都给广播出去。 ### 用户退出 用户退出主要有那么几种情形: 1. 用户关闭了客户端,或者用户手动停止了当前程序的运行。这个情形不难处理,还是在上面说的那个switch分支里面,加上一个case,如果服务器读出来的内容是0,就说明客户端已经断开连接了。相应的代码片段: ```go n, err := conn.Read(buf2) //用于存储当前与服务器通信的客户端上的那个同户名 thisUser := onlineUsers[conn.RemoteAddr().String()].name switch { case n == 0: fmt.Println(conn.RemoteAddr(), "已断开连接") for _, v := range onlineUsers { if thisUser != "" { v.NewUser <- []byte("💨用户[" + thisUser + "]已退出当前聊天室\n") } } ``` 2. 如果服务器主动在HandleConnect()函数里执行了return,或者服务器程序关闭、停止运行,也会导致客户端的退出,这种情形对应的代码是客户端循环读取服务器消息时,读出内容的长度为0,和上一条是相对的。代码片段: ```go for { buffer2:=make([]byte,4096) n,err:=conn.Read(buffer2) if n==0{ fmt.Println("服务器已关闭当前连接,正在退出……") return } if err!=nil{ fmt.Println("conn.Read error:",err) return } fmt.Print(string(buffer2[:n])) } ``` 3. 第三种情形就是用户长时间没有发送消息,服务器会有一个超时处理的select,负责把这样的用户踢出聊天室。具体内容在下一个超时处理模块分析。 ### 超时处理 当时在解决这个问题的时候也是琢磨了好久尝试过很多种channel,放到过很多位置也没能实现。最后的解决思路是这样的:服务器中有一个go程专门负责循环监听着客户端发来的消息,如果客户端没有任何动作,服务器就会在相应的conn.Read()处阻塞,所以应在HandleConnect函数的开头处定义一个控制超时的channel——overTime,当服务器读到客户端消息后,再给overTime写入“true”,那么负责监听overTime输出端和超时的select代码块就只能放在这个go程的外面。 相应代码片段: ```go for { select { case <-overTime: case <-time.After(time.Second * 60): _, _ = conn.Write([]byte("抱歉,由于长时间未发送聊天内容,您已被系统踢出")) thisUser := onlineUsers[conn.RemoteAddr().String()].name for _, v := range onlineUsers { if thisUser != "" { v.NewUser <- []byte("💨用户[" + thisUser + "]由于长时间未发送消息已被踢出当前聊天室\n") } } delete(onlineUsers, conn.RemoteAddr().String()) return } } ``` 只有不断向overTime写入数据,这个select才不会进入计时。当overTime写入了数据后,这个case不作为,意味着立即重新执行循环,进入计时,也就是老师讲课时说的**重置计时器**。 ## 总结 本文只对服务器的功能模块进行了详细说明,实际开发的时候我是服务器客户端同时来写的,如果你能完全看明白服务器的代码,那么客户端的代码就很容易了,所以这里对于客户端不做介绍。 我在整个项目中最耗时的部分是:整个并发机制的理解,就是说所有这5个channel各自的作用,怎么协同运行的,需要动脑筋去思考;然后是select超时处理那部分,之前我把管家manager放到了监听客户端消息的那个go程里面了,带来了很多麻烦。其实管家manager只需负责监听全局channel,不必写在HandleConnect里面,而且它里面用到的变量也基本都是全局变量,所以完全可以把它放到HandleConnect()外面,单独作为一个go程去运行。由此给我带来的教训是:如果一个函数或代码块与另一个函数之间不存在绝对的关联性,就不要放在另一个函数里面,否则就会产生相互依赖,进而带来麻烦,当一个项目的逻辑越来越复杂,能做到这一点还是很不容易的;还有就是用户修改名称那部分,牵扯出来很多逻辑判断,这里也调试了很久;再然后就是各种小bug了,可以说是不计其数,最后也是一点点得到了解决。 ## 源码 最后给大家贴出我的这个并发聊天室源码和老师给的源码。老师的只有服务器,我的源码里服务器和客户端都有(知道你们都在等这个,哈哈哈哈哈)。先来看我的源码吧。 我的服务器源码: ```go package main import ( "fmt" "net" "time" ) //定义一个全局的channel,用于处理从各个客户端读到的消息 var message = make(chan []byte) //定义一个结构体userInfo,用于存储每位聊天室用户的信息(名称+用户各自的管道C) type userInfo struct { name string C chan []byte NewUser chan []byte //用于广播用户进入或退出当前聊天室的信息 } //定义一个map,用于存储聊天室中所有在线的用户和用户信息 var onlineUsers = make(map[string]userInfo) func main() { listener, err := net.Listen("tcp", "127.0.0.1:8011") if err != nil { fmt.Println("net.Listen error:", err) return } fmt.Println("够浪聊天室-服务器已启动") fmt.Println("正在监听客户端连接请求……") //启动管家go程,不断监听全局channel————message go manager() for { conn, err := listener.Accept() if err != nil { fmt.Println("listener.Accept error:", err) return } fmt.Printf("地址为[%v]的客户端已连接成功\n", conn.RemoteAddr()) // 如果监听到连接请求并成功以后, // 服务器进入下面的go程, // 在该go程中处理服务器和该客户端之间的读写或其他事件 // 与此同时,服务器在主go程中回去继续监听着其他客户端的连接请求 go HandleConnect(conn) } } // 这个函数完成服务器对一个客户端的整套处理流程 func HandleConnect(conn net.Conn) { defer conn.Close() // 管道overTime用于处理超时 overTime := make(chan bool) // 用于存储用户名信息 buf1 := make([]byte, 4096) n, err := conn.Read(buf1) if err != nil { fmt.Println("conn.Read error:", err) return } userName := string(buf1[:n]) //n-1是为了去掉末尾的\n perC := make(chan []byte) perNewUser := make(chan []byte) user := userInfo{name: userName, C: perC, NewUser: perNewUser} onlineUsers[conn.RemoteAddr().String()] = user fmt.Printf("用户[%s]注册成功\n", userName) _, _ = conn.Write([]byte("💟💓💖💞💛你好," + userName + ",欢迎来到『够浪』™聊天室,请畅所欲言!💝💘💗💕💗")) //广播通知。遍历map go func() { for _, v := range onlineUsers { v.NewUser <- []byte("🤵用户[" + userName + "]已加入当前聊天室\n") } }() //监听每位用户自己的channel go func() { for { select { case msg1 := <-user.NewUser: _, _ = conn.Write(msg1) case msg2 := <-user.C: _, _ = conn.Write(msg2) } } }() //循环读取客户端发来的消息 go func() { buf2 := make([]byte, 4096) for { n, err := conn.Read(buf2) //用于存储当前与服务器通信的客户端上的那个同户名 thisUser := onlineUsers[conn.RemoteAddr().String()].name switch { case n == 0: fmt.Println(conn.RemoteAddr(), "已断开连接") for _, v := range onlineUsers { if thisUser != "" { v.NewUser <- []byte("💨用户[" + thisUser + "]已退出当前聊天室\n") } } delete(onlineUsers, conn.RemoteAddr().String()) return case string(buf2[:n]) == "who\n": _, _ = conn.Write([]byte("当前在线用户:\n")) for _, v := range onlineUsers { //fmt.Println(v.name) _, _ = conn.Write([]byte("🟢" + v.name + "\n")) } case len(string(buf2[:n])) > 7 && string(buf2[:n])[:7] == "rename|": //n-1去掉buf2里的空格 onlineUsers[conn.RemoteAddr().String()] = userInfo{name:string(buf2[:n-1])[7:],C: perC, NewUser: perNewUser} _, _ = conn.Write([]byte("您已成功修改用户名!\n")) } if err != nil { fmt.Println("conn.Read error:", err) return } var msg []byte if buf2[0] != 10 && string(buf2[:n]) != "who\n" { if len(string(buf2[:n])) <= 7 || string(buf2[:n])[:7] != "rename|" { msg = append([]byte("💬["+thisUser+"]对大家说:"), buf2[:n]...) } } else { msg = nil } // overTime <- true message <- msg } }() for { select { case <-overTime: case <-time.After(time.Second * 60): _, _ = conn.Write([]byte("抱歉,由于长时间未发送聊天内容,您已被系统踢出")) thisUser := onlineUsers[conn.RemoteAddr().String()].name for _, v := range onlineUsers { if thisUser != "" { v.NewUser <- []byte("💨用户[" + thisUser + "]由于长时间未发送消息已被踢出当前聊天室\n") } } delete(onlineUsers, conn.RemoteAddr().String()) return } } } //管家循环监听管道message func manager() { for { select { case msg := <-message: for _, v := range onlineUsers { v.C <- msg } } } } ``` 我的客户端源码: ```go package main import ( "fmt" "net" "os" ) func main() { fmt.Println("正在连接服务器……") conn,err:=net.Dial("tcp","127.0.0.1:8011") if err!=nil{ fmt.Println("net.Dial error:",err) return } defer conn.Close() fmt.Println("连接服务器成功") fmt.Println("先起一个名字吧:") var userName string //使用Scan输入,不允许出现空格 _, _ = fmt.Scan(&userName) _, _ = conn.Write([]byte(userName)) buf2:=make([]byte,4096) n, err := conn.Read(buf2) if err!=nil{ fmt.Println("conn.Read error:",err) return } // 客户端收到“你好,***,欢迎来到够浪聊天室,请畅所欲言!” fmt.Println(string(buf2[:n])) fmt.Println("⚠提示:长时间没有发送消息会被系统强制踢出") //客户端发送消息到服务器 go func() { for { buffer1:=make([]byte,4096) //这里使用Stdin标准输入,因为scanf无法识别空格 n,err:=os.Stdin.Read(buffer1) if err!=nil{ fmt.Println("os.Stdin.Read error:",err) continue } _, _ = conn.Write(buffer1[:n]) //写操作出现error的概率比较低,这里省去判断 } }() //接收服务器发来的数据 for { buffer2:=make([]byte,4096) n,err:=conn.Read(buffer2) if n==0{ fmt.Println("服务器已关闭当前连接,正在退出……") return } if err!=nil{ fmt.Println("conn.Read error:",err) return } fmt.Print(string(buffer2[:n])) } } ``` 老师给的服务器源码: ```go package main import ( "net" "fmt" "strings" "time" ) // 创建用户结构体类型! type Client struct { C chan string Name string Addr string } // 创建全局map,存储在线用户 var onlineMap map[string]Client // 创建全局 channel 传递用户消息。 var message = make(chan string) func WriteMsgToClient(clnt Client, conn net.Conn) { // 监听 用户自带Channel 上是否有消息。 for msg := range clnt.C { conn.Write([]byte(msg + "\n")) } } func MakeMsg(clnt Client, msg string) (buf string) { buf = "[" + clnt.Addr + "]" + clnt.Name + ": " + msg return } func HandlerConnect(conn net.Conn) { defer conn.Close() // 创建channel 判断,用户是否活跃。 hasData := make(chan bool) // 获取用户 网络地址 IP+port netAddr := conn.RemoteAddr().String() // 创建新连接用户的 结构体. 默认用户是 IP+port clnt := Client{make(chan string), netAddr, netAddr} // 将新连接用户,添加到在线用户map中. key: IP+port value:client onlineMap[netAddr] = clnt // 创建专门用来给当前 用户发送消息的 go 程 go WriteMsgToClient(clnt, conn) // 发送 用户上线消息到 全局channel 中 //message <- "[" + netAddr + "]" + clnt.Name + "login" message <- MakeMsg(clnt, "login") // 创建一个 channel , 用来判断用退出状态 isQuit := make(chan bool) // 创建一个匿名 go 程, 专门处理用户发送的消息。 go func() { buf := make([]byte, 4096) for { n, err := conn.Read(buf) if n == 0 { isQuit <- true fmt.Printf("检测到客户端:%s退出\n", clnt.Name) return } if err != nil { fmt.Println("conn.Read err:", err) return } // 将读到的用户消息,保存到msg中,string 类型 msg := string(buf[:n-1]) // 提取在线用户列表 if msg == "who" && len(msg) == 3 { conn.Write([]byte("online user list:\n")) // 遍历当前 map ,获取在线用户 for _, user := range onlineMap { userInfo := user.Addr + ":" + user.Name + "\n" conn.Write([]byte(userInfo)) } // 判断用户发送了 改名 命令 } else if len(msg) >=8 && msg[:6] == "rename" { // rename| newName := strings.Split(msg, "|")[1] // msg[8:] clnt.Name = newName // 修改结构体成员name onlineMap[netAddr] = clnt // 更新 onlineMap conn.Write([]byte("rename successful\n")) }else { // 将读到的用户消息,写入到message中。 message <- MakeMsg(clnt, msg) } hasData <- true } }() // 保证 不退出 for { // 监听 channel 上的数据流动 select { case <-isQuit: delete(onlineMap, clnt.Addr) // 将用户从 online移除 message <- MakeMsg(clnt, "logout") // 写入用户退出消息到全局channel return case <-hasData: // 什么都不做。 目的是重置 下面 case 的计时器。 case <-time.After(time.Second * 60): delete(onlineMap, clnt.Addr) // 将用户从 online移除 message <- MakeMsg(clnt, "time out leaved") // 写入用户退出消息到全局channel return } } } func Manager() { // 初始化 onlineMap onlineMap = make(map[string]Client) // 监听全局channel 中是否有数据, 有数据存储至 msg, 无数据阻塞。 for { msg := <-message // 循环发送消息给 所有在线用户。要想执行,必须 msg := <-message 执行完, 解除阻塞。 for _, clnt := range onlineMap { clnt.C <- msg } } } func main() { // 创建监听套接字 listener, err := net.Listen("tcp", "127.0.0.1:8000") if err != nil { fmt.Println("Listen err", err) return } defer listener.Close() // 创建管理者go程,管理map 和全局channel go Manager() // 循环监听客户端连接请求 for { conn, err := listener.Accept() if err != nil { fmt.Println("Accept err", err) return } // 启动go程处理客户端数据请求 go HandlerConnect(conn) } } ```

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:muxilin131420 备注:入群;或加QQ群:729884609

1193 次点击  ∙  2 赞  
加入收藏 微博
3 回复  |  直到 2019-12-22 10:03:16
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传