前言
IM即时通讯聊天, 为软件开发者打造,不依赖第三方sdk,完全用Go语言开发即时通讯服务,支持H5、Electron、Wails 、Uniapp和各种小程序的IM即时通讯, 快速实现私聊、群聊、在线客服!让你快速搭建一个微信聊天系统,打造一个类微信聊天应用。
内容如下:
- 完全基于GoFly框架开发即时通讯服务器,不依赖第三方即时通讯SDK,减少维护成本。
- 支持gofly管理后台、H5、Electron、Wails 、Uniapp和各种小程序的IM即时通讯
- 一对一单聊
- 群聊
- 在线客服
后端选择技术栈:
- 开发语言:Golang
- 基础框架:Gin
- 集成框架:GoFly快速开发框架
- 数据库:mysql(可迁移PostgreSQL、SQL-Server、oracle)
前端选择技术栈:
- 脚手架搭建:vite
- web框架:vue3
- 前端语言:TypeScript
- 前端UI:ArcoDesign
通讯协议:
即时通讯协议:websocket,通讯核心代码如下:
- Go服务端代码:
package websocket
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
var (
// 消息通道
news = make(map[string]chan interface{})
// websocket客户端链接池
client = make(map[string]*websocket.Conn)
// 互斥锁,防止程序对统一资源同时进行读写
mux sync.Mutex
)
// websocket Upgrader
var wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
HandshakeTimeout: 5 * time.Second,
// 取消ws跨域校验
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// WsHandler 处理ws请求
func WsHandler(w http.ResponseWriter, r *http.Request, id string) {
var conn *websocket.Conn
var err error
var exist bool
// 创建一个定时器用于服务端心跳
pingTicker := time.NewTicker(time.Second * 10)
conn, err = wsupgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("处理ws请求错误", err)
return
}
// 把与客户端的链接添加到客户端链接池中
addClient(id, conn)
// 获取该客户端的消息通道
m, exist := getNewsChannel(id)
if !exist {
m = make(chan interface{})
addNewsChannel(id, m)
}
// 设置客户端关闭ws链接回调函数
conn.SetCloseHandler(func(code int, text string) error {
deleteClient(id)
fmt.Println("端关闭ws链接回调函数错误", code)
return nil
})
for {
select {
case content, _ := <-m:
// 从消息通道接收消息,然后推送给前端
fmt.Println("从消息通道接收消息:", content)
err = conn.WriteJSON(content)
if err != nil {
fmt.Println("推送给前端数错误", err)
conn.Close()
deleteClient(id)
return
}
case <-pingTicker.C:
// 服务端心跳:每20秒ping一次客户端,查看其是否在线
conn.SetWriteDeadline(time.Now().Add(time.Second * 20))
err = conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
fmt.Println("send ping err:", err)
conn.Close()
deleteClient(id)
return
}
}
}
}
// 将客户端添加到客户端链接池
func addClient(id string, conn *websocket.Conn) {
mux.Lock()
client[id] = conn
mux.Unlock()
}
// 获取指定客户端链接
func getClient(id string) (conn *websocket.Conn, exist bool) {
mux.Lock()
conn, exist = client[id]
mux.Unlock()
return
}
// 删除客户端链接
func deleteClient(id string) {
mux.Lock()
delete(client, id)
fmt.Println("websocket退出:", id)
mux.Unlock()
}
// 添加用户消息通道
func addNewsChannel(id string, m chan interface{}) {
mux.Lock()
news[id] = m
mux.Unlock()
}
// 获取指定用户消息通道
func getNewsChannel(id string) (m chan interface{}, exist bool) {
mux.Lock()
m, exist = news[id]
mux.Unlock()
return
}
// 删除指定消息通道
func deleteNewsChannel(id string) {
mux.Lock()
if m, ok := news[id]; ok {
close(m)
delete(news, id)
}
mux.Unlock()
}
// 1.对点消息推送
func SetMessage(id string, content interface{}) {
mux.Lock()
if m, exist := news[id]; exist {
go func() {
m <- content
}()
}
mux.Unlock()
}
// 2.群发消息
func SetMessageAllClient(content interface{}) {
mux.Lock()
all := news
mux.Unlock()
go func() {
for _, m := range all {
m <- content
}
}()
}
- 前端ts代码:
// WebSocket链接工具
import { onUnmounted } from 'vue';
interface WebSocketOptions {
url: string;
protocols?: string | string[];
reconnectTimeout?: number;
}
class WebSocketService {
private ws: WebSocket | null = null;
private callbacks: { [key: string]: Function[] } = {};
private reconnectTimeoutMs: number = 5000; // 默认5秒重连间隔
constructor(private options: WebSocketOptions) {}
//实现断线重连
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
public open(): void {
if(!this.ws){
this.ws = new WebSocket(this.options.url, this.options.protocols)
this.ws.addEventListener('open', this.handleOpen);
this.ws.addEventListener('message', this.handleMessage);
this.ws.addEventListener('error', this.handleError);
this.ws.addEventListener('close', this.handleClose);
//为了保持连接的稳定性,我们可以添加心跳机制
this.startHeartbeat();
}
}
//连接
public connect(url:any): void {
if(url){
this.options.url=url
this.open();
}else{
console.error("请传url链接地址")
}
}
public close(isActiveClose = false): void {
if (this.ws) {
this.ws.close();
if (!isActiveClose) {
setTimeout(() => this.reconnect(), this.reconnectTimeoutMs);
}
}
}
//重连
public reconnect(): void {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`尝试重新连接... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.open();
}, this.reconnectTimeoutMs);
} else {
console.error('达到最大重连次数,连接失败');
}
}
public on(event: 'message', callback: (data: any) => void): void;
public on(event: 'open' | 'error' | 'close', callback: () => void): void;
public on(event: string, callback: (...args: any[]) => void): void {
if (!this.callbacks[event]) {
this.callbacks[event] = [];
}
this.callbacks[event].push(callback);
}
private handleOpen = (): void => {
console.log('WebSocket连接已建立');
if (this.callbacks.open) {
this.callbacks.open.forEach((cb) => cb());
}
//实现断线重连
this.reconnectAttempts = 0;
this.startHeartbeat();
};
private handleMessage = (event: MessageEvent): void => {
const data = JSON.parse(event.data);
// console.log('WebSocket接收到消息:', data);
if (this.callbacks.message) {
this.callbacks.message.forEach((cb) => cb(data));
}
};
private handleError = (error: Event): void => {
console.error('WebSocket错误:', error);
if (this.callbacks.error) {
this.callbacks.error.forEach((cb) => cb(error));
}
};
private handleClose = (): void => {
console.log('WebSocket连接已关闭');
//实现断线重连
this.ws=null
this.stopHeartbeat();
this.reconnect();
if (this.callbacks.close) {
this.callbacks.close.forEach((cb) => cb());
if (!this.options.reconnectTimeout) {
this.reconnect();
}
}
};
public send(data: any): void {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
console.warn('尝试发送消息时WebSocket未连接');
}
}
//添加心跳机制
private heartbeatTimer: any | null = null;
private heartbeatInterval = 30000; // 30秒
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
this.send({ type: 'heartbeat' });
}, this.heartbeatInterval);
}
private stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}
export default function useWebSocket(options: WebSocketOptions) {
const wsService = new WebSocketService(options);
onUnmounted(() => {
wsService.close(true);
});
return {
open: wsService.open.bind(wsService),
connect: wsService.connect.bind(wsService),
close: wsService.close.bind(wsService),
reconnect: wsService.reconnect.bind(wsService),
on: wsService.on.bind(wsService),
send: wsService.send.bind(wsService)
};
}
效果如下:
下载代码:
有疑问加站长微信联系(非本文作者)
源码?你应该说是部分代码