Go语言开发im-websocket服务和vue3+ts开发类似微信pc即时通讯

huanglishi · 3月之前 · 703 次点击 · 大约8小时之前 开始浏览    置顶

前言

IM即时通讯聊天, 为软件开发者打造,不依赖第三方sdk,完全用Go语言开发即时通讯服务,支持H5、Electron、Wails 、Uniapp和各种小程序的IM即时通讯, 快速实现私聊、群聊、在线客服!让你快速搭建一个微信聊天系统,打造一个类微信聊天应用。

内容如下:

  • 完全基于GoFly框架开发即时通讯服务器,不依赖第三方即时通讯SDK,减少维护成本。
  • 支持gofly管理后台、H5、Electron、Wails 、Uniapp和各种小程序的IM即时通讯
  • 一对一单聊
  • 群聊
  • 在线客服

后端选择技术栈:

  • 开发语言:Golang
  • 基础框架:Gin
  • 集成框架:GoFly快速开发框架
  • 数据库:mysql(可迁移PostgreSQL、SQL-Server、oracle)

前端选择技术栈:

  • 脚手架搭建:vite
  • web框架:vue3
  • 前端语言:TypeScript
  • 前端UI:ArcoDesign

通讯协议:

即时通讯协议:websocket,通讯核心代码如下:

  • Go服务端代码:
package websocket

import (
    "fmt"
    "net/http"
    "sync"
    "time"

    "github.com/gorilla/websocket"
)

var (
    // 消息通道
    news = make(map[string]chan interface{})
    // websocket客户端链接池
    client = make(map[string]*websocket.Conn)
    // 互斥锁,防止程序对统一资源同时进行读写
    mux sync.Mutex
)

// websocket Upgrader
var wsupgrader = websocket.Upgrader{
    ReadBufferSize:   1024,
    WriteBufferSize:  1024,
    HandshakeTimeout: 5 * time.Second,
    // 取消ws跨域校验
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

// WsHandler 处理ws请求
func WsHandler(w http.ResponseWriter, r *http.Request, id string) {
    var conn *websocket.Conn
    var err error
    var exist bool
    // 创建一个定时器用于服务端心跳
    pingTicker := time.NewTicker(time.Second * 10)
    conn, err = wsupgrader.Upgrade(w, r, nil)
    if err != nil {
        fmt.Println("处理ws请求错误", err)
        return
    }
    // 把与客户端的链接添加到客户端链接池中
    addClient(id, conn)
    // 获取该客户端的消息通道
    m, exist := getNewsChannel(id)
    if !exist {
        m = make(chan interface{})
        addNewsChannel(id, m)
    }
    // 设置客户端关闭ws链接回调函数
    conn.SetCloseHandler(func(code int, text string) error {
        deleteClient(id)
        fmt.Println("端关闭ws链接回调函数错误", code)
        return nil
    })

    for {
        select {
        case content, _ := <-m:
            // 从消息通道接收消息,然后推送给前端
            fmt.Println("从消息通道接收消息:", content)
            err = conn.WriteJSON(content)
            if err != nil {
                fmt.Println("推送给前端数错误", err)
                conn.Close()
                deleteClient(id)
                return
            }
        case <-pingTicker.C:
            // 服务端心跳:每20秒ping一次客户端,查看其是否在线
            conn.SetWriteDeadline(time.Now().Add(time.Second * 20))
            err = conn.WriteMessage(websocket.PingMessage, []byte{})
            if err != nil {
                fmt.Println("send ping err:", err)
                conn.Close()
                deleteClient(id)
                return
            }
        }
    }
}

// 将客户端添加到客户端链接池
func addClient(id string, conn *websocket.Conn) {
    mux.Lock()
    client[id] = conn
    mux.Unlock()
}

// 获取指定客户端链接
func getClient(id string) (conn *websocket.Conn, exist bool) {
    mux.Lock()
    conn, exist = client[id]
    mux.Unlock()
    return
}

// 删除客户端链接
func deleteClient(id string) {
    mux.Lock()
    delete(client, id)
    fmt.Println("websocket退出:", id)
    mux.Unlock()
}

// 添加用户消息通道
func addNewsChannel(id string, m chan interface{}) {
    mux.Lock()
    news[id] = m
    mux.Unlock()
}

// 获取指定用户消息通道
func getNewsChannel(id string) (m chan interface{}, exist bool) {
    mux.Lock()
    m, exist = news[id]
    mux.Unlock()
    return
}

// 删除指定消息通道
func deleteNewsChannel(id string) {
    mux.Lock()
    if m, ok := news[id]; ok {
        close(m)
        delete(news, id)
    }
    mux.Unlock()
}

// 1.对点消息推送
func SetMessage(id string, content interface{}) {
    mux.Lock()
    if m, exist := news[id]; exist {
        go func() {
            m <- content
        }()
    }
    mux.Unlock()
}

// 2.群发消息
func SetMessageAllClient(content interface{}) {
    mux.Lock()
    all := news
    mux.Unlock()
    go func() {
        for _, m := range all {
            m <- content
        }
    }()

}
  • 前端ts代码:
// WebSocket链接工具
import {  onUnmounted } from 'vue';

interface WebSocketOptions {
    url: string;
    protocols?: string | string[];
    reconnectTimeout?: number;
}

class WebSocketService {
    private ws: WebSocket | null = null;
    private callbacks: { [key: string]: Function[] } = {};
    private reconnectTimeoutMs: number = 5000; // 默认5秒重连间隔
    constructor(private options: WebSocketOptions) {}
    //实现断线重连
    private reconnectAttempts = 0;
    private maxReconnectAttempts = 5;

    public open(): void {
        if(!this.ws){
            this.ws = new WebSocket(this.options.url, this.options.protocols)
            this.ws.addEventListener('open', this.handleOpen);
            this.ws.addEventListener('message', this.handleMessage);
            this.ws.addEventListener('error', this.handleError);
            this.ws.addEventListener('close', this.handleClose);
              //为了保持连接的稳定性,我们可以添加心跳机制
            this.startHeartbeat();
        }
    }
    //连接
    public connect(url:any): void {
        if(url){
            this.options.url=url
            this.open();
        }else{
            console.error("请传url链接地址")
        }
    }
    public close(isActiveClose = false): void {
        if (this.ws) {
            this.ws.close();
            if (!isActiveClose) {
                setTimeout(() => this.reconnect(), this.reconnectTimeoutMs);
            }
        }
    }
    //重连
    public reconnect(): void {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            console.log(`尝试重新连接... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
            setTimeout(() => {
                this.open();
            }, this.reconnectTimeoutMs);
        } else {
           console.error('达到最大重连次数,连接失败');
        }
    }

    public on(event: 'message', callback: (data: any) => void): void;
    public on(event: 'open' | 'error' | 'close', callback: () => void): void;
    public on(event: string, callback: (...args: any[]) => void): void {
        if (!this.callbacks[event]) {
            this.callbacks[event] = [];
        }
        this.callbacks[event].push(callback);
    }

    private handleOpen = (): void => {
        console.log('WebSocket连接已建立');
        if (this.callbacks.open) {
            this.callbacks.open.forEach((cb) => cb());
        }
        //实现断线重连
        this.reconnectAttempts = 0;
        this.startHeartbeat();
    };

    private handleMessage = (event: MessageEvent): void => {
        const data = JSON.parse(event.data);
        // console.log('WebSocket接收到消息:', data);
        if (this.callbacks.message) {
            this.callbacks.message.forEach((cb) => cb(data));
        }
    };

    private handleError = (error: Event): void => {
        console.error('WebSocket错误:', error);
        if (this.callbacks.error) {
            this.callbacks.error.forEach((cb) => cb(error));
        }
    };

    private handleClose = (): void => {
        console.log('WebSocket连接已关闭');
        //实现断线重连
        this.ws=null
        this.stopHeartbeat();
        this.reconnect();

        if (this.callbacks.close) {
            this.callbacks.close.forEach((cb) => cb());
            if (!this.options.reconnectTimeout) {
                this.reconnect();
            }
        }
    };

    public send(data: any): void {
        if (this.ws && this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify(data));
        } else {
            console.warn('尝试发送消息时WebSocket未连接');
        }
    }
    //添加心跳机制
    private heartbeatTimer: any | null = null;
    private heartbeatInterval = 30000; // 30秒
    private startHeartbeat() {
      this.heartbeatTimer = setInterval(() => {
        this.send({ type: 'heartbeat' });
      }, this.heartbeatInterval);
    }
    private stopHeartbeat() {
        if (this.heartbeatTimer) {
          clearInterval(this.heartbeatTimer);
          this.heartbeatTimer = null;
        }
      }

}

export default function useWebSocket(options: WebSocketOptions) {
    const wsService = new WebSocketService(options);
    onUnmounted(() => {
        wsService.close(true);
    });
    return {
        open: wsService.open.bind(wsService),
        connect: wsService.connect.bind(wsService),
        close: wsService.close.bind(wsService),
        reconnect: wsService.reconnect.bind(wsService),
        on: wsService.on.bind(wsService),
        send: wsService.send.bind(wsService)
    };
  }

效果如下:

微信截图_20240916220324.png

微信截图_20240916220247.png

微信截图_20240916220341.png 微信截图_20240916220435.png

下载代码:

下载地址


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

703 次点击  ∙  1 赞  
加入收藏 微博
1 回复  |  直到 2024-09-20 10:35:36
kyomic
kyomic · #1 · 3月之前

源码?你应该说是部分代码

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传