问题描述:
在linux arm64 环境 (linux amd64环境没有问题)
开启服务监听,然后给客户端写大量数据,测试并发性,比如模拟10个客户端接受数据,在allsend函数中,使用随机获取的数据大小,那么发送就会阻塞 conn.Write([]byte) 。 如果使用固定的数据大小比如10k, 那么就不会阻塞。
服务端代码:
```
package server
import (
"fmt"
"math/rand"
"net"
"sync"
"time"
)
//写数据的缓冲chan大小
var CACHE_SIZE = 20
//用户数据结构
type User struct {
conn net.Conn //socket连接
writeChan chan []byte //写出序列
}
//新建用户
func NewUser() *User {
user := &User{}
user.writeChan = make(chan []byte, CACHE_SIZE)
return user
}
var g_users = sync.Map{} //conn:User
func Run() {
//开启转发服务
go func() {
fmt.Println("启动h264转发服务,端口", 5559)
service := ":5559"
tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
if err != nil {
panic(err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
panic(err)
}
go allsend()
for {
conn, err := listener.Accept()
if err != nil {
continue
}
fmt.Println("有h264客户端连接了")
user := NewUser()
user.conn = conn
g_users.Store(conn, user)
go read(user)
go socketwrite(user)
}
}()
}
//发送给所有客户端
func allsend() {
for {
var onesize = 10000
//随机生成发送数据大小
r := rand.New(rand.NewSource(time.Now().UnixNano()))
n := r.Intn(onesize) + onesize/2
wirtedata := make([]byte, n)
g_users.Range(func(k, v interface{}) bool {
select {
case v.(*User).writeChan <- wirtedata:
default:
fmt.Println("阻塞了")
}
return true
})
<-time.After(time.Millisecond * 50)
}
}
//socket中读取数据
func read(user *User) {
b := make([]byte, 10000)
for {
if _, e := user.conn.Read(b); e != nil {
breakClient(user)
} else {
break
}
}
}
var sendsize = 0
var sendtime = time.Now().Unix()
//socket写数据
func socketwrite(user *User) {
for {
select {
case tw, b := <-user.writeChan:
if b == false {
//说明关闭了chan
return
}
if _, e := user.conn.Write(tw); e != nil {
breakClient(user)
}
sendsize += len(tw)
case <-time.After(time.Second):
//如果超时1秒还没娶到数据 说明没数据了
fmt.Println("----------------------------------------------")
}
if time.Now().Unix() > sendtime {
sendtime = time.Now().Unix()
fmt.Println(" 每秒发送(MB):", sendsize/1024/1024)
sendsize = 0
}
}
}
//断开客户端连接
func breakClient(user *User) {
defer func() {
recover()
}()
g_users.Delete(user.conn)
close(user.writeChan)
user.conn.Close()
}
```
客户端代码:
```
package socketclient
import (
"fmt"
"net"
"os"
"time"
)
var sendsize = 0
var sendtime = time.Now().Unix()
func Newclient() {
server := Serverip + ":5559"
tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
fmt.Println("connection success")
go clientRead(conn)
}
//读取客户端
func clientRead(conn net.Conn) {
request := make([]byte, 50000)
for {
readLen, err := conn.Read(request)
if err != nil {
fmt.Println("有客户端断开了")
break
}
sendsize += readLen
if time.Now().Unix() > sendtime {
sendtime = time.Now().Unix()
fmt.Println("用户数量:", Count, "每秒收到(Mb):", sendsize*8/1024/1024)
sendsize = 0
}
}
}
```
有疑问加站长微信联系(非本文作者)