请教关闭TCP监听时的争用问题

visli · 2022-11-30 08:52:17 · 2618 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2022-11-30 08:52:17 的主题,其中的信息可能已经有所发展或是发生改变。

我一个TCP服务,希望从外部关闭Listener,但这样就在listener.Close()与listener.Accept()之间产生了race。请教有什么好的解决办法? 简单的示例代码如下, 加"-race"参数运行,再ctrl+c退出运行,就会报race问题:

package main

import (
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"
)

var listener *net.TCPListener

// tcp服务
func tcpServer() {
    addr, err := net.ResolveTCPAddr("tcp4", ":6666")
    if err != nil {
        log.Fatalln(err)
    }
    listener, err = net.ListenTCP("tcp4", addr)
    if err != nil {
        log.Fatalln(err)
    }

    for {
        c, err := listener.AcceptTCP()
        if err != nil {
            if err.(*net.OpError).Err == net.ErrClosed {
                log.Println("listener closed")
                return
            }
            log.Println(err)
            continue
        }
        connHandler(c)
    }
}

// 新连接处理
func connHandler(c *net.TCPConn) {
    defer func() {
        c.Close()
        log.Printf("%s closed", c.RemoteAddr().String())
    }()
    log.Printf("New connection: %s", c.RemoteAddr().String())
    buf := make([]byte, 128)
    for {
        n, err := c.Read(buf)
        if err != nil {
            return
        }
        log.Println(buf[:n])
    }
}

// 关闭listener
func listenerClose() {
    if listener != nil {
        listener.Close()
    }
}

func main() {
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    go tcpServer()
    <-sigs
    listenerClose()
    time.Sleep(time.Millisecond * 200)
}

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2618 次点击  
加入收藏 微博
14 回复  |  直到 2022-12-05 18:21:13
jan-bar
jan-bar · #1 · 2年之前

我的写法核心就是让Accept()操作设置超时时间,保证可以执行其他代码。

或者关闭代码就在你的程序里面模拟tcp连接发送特殊字符,然后服务端收到特殊字符就走退出逻辑。

package main

import (
    "context"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"
)

// tcp服务
func tcpServer(ctx context.Context) {
    addr, err := net.ResolveTCPAddr("tcp4", ":6666")
    if err != nil {
        log.Fatalln(err)
    }
    listener, err := net.ListenTCP("tcp4", addr)
    if err != nil {
        log.Fatalln(err)
    }

    for {
        select {
        case <-ctx.Done():
            err = listener.Close()
            if err != nil {
                log.Fatalln(err)
            }
            return
        default:
            // 控制超时时间,让代码有机会执行ctx.Done()
            _ = listener.SetDeadline(time.Now().Add(time.Second))
            c, err := listener.AcceptTCP()
            if err != nil {
                if opErr, ok := err.(*net.OpError); ok {
                    if opErr.Timeout() {
                        continue
                    }
                    if opErr.Err == net.ErrClosed {
                        log.Println("listener closed")
                        return
                    }
                }
                log.Println(err)
                continue
            }
            connHandler(c)
        }
    }
}

// 新连接处理
func connHandler(c *net.TCPConn) {
    defer func() {
        c.Close()
        log.Printf("%s closed", c.RemoteAddr().String())
    }()
    log.Printf("New connection: %s", c.RemoteAddr().String())
    buf := make([]byte, 128)
    for {
        n, err := c.Read(buf)
        if err != nil {
            return
        }
        log.Println(buf[:n])
    }
}

func main() {
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
    ctx, cancel := context.WithCancel(context.Background())
    go tcpServer(ctx)
    <-sigs
    cancel()
    time.Sleep(time.Millisecond * 200)
}
visli
visli · #2 · 2年之前

@jan-bar 谢谢。目前只有这个办法了。

zzustu
zzustu · #3 · 2年之前

net.Listen() 的时候就支持传入一个 context,demo 如下:

package main

import (
    "context"
    "net"
)

var cancelFunc context.CancelFunc

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    cancelFunc = cancel

    lc := new(net.ListenConfig)
    lis, _ := lc.Listen(ctx, "tcp4", ":6666") // demo 忽略 error
    for {
        conn, _ := lis.Accept() // demo 忽略 error
        tcpConn := conn.(*net.TCPConn)
        // TODO handle tcpConn
    }
}

func Close() {
    if cancelFunc != nil {
        cancelFunc()
    }
}
jan-bar
jan-bar · #4 · 2年之前

3楼 @zzustu 我看了下源码,lc.Listen(ctx)这个ctx不会生效到Accept里面,只会控制lc.Listen(ctx)过程中的协程。所以这个方案不行额。而且你的cancelFunc又回到原问题了,还是产生了race。

zzustu
zzustu · #5 · 2年之前
jan-barjan-bar #4 回复

3楼 @zzustu 我看了下源码,`lc.Listen(ctx)`这个`ctx`不会生效到`Accept`里面,只会控制`lc.Listen(ctx)`过程中的协程。所以这个方案不行额。而且你的`cancelFunc `又回到原问题了,还是产生了race。

不会生效到 Accept() 里面是什么意思,是:tcpConn 不会继承 Listener 的 ctx 吗?cancelFunc 在哪里 race 了

jan-bar
jan-bar · #6 · 2年之前
zzustuzzustu #5 回复

#4楼 @jan-bar 不会生效到 `Accept()` 里面是什么意思,是:tcpConn 不会继承 Listener 的 ctx 吗?cancelFunc 在哪里 race 了

Accept()源码里面没有任何地方用到ctx,所以你的ctx影响不到Accept()。你的cancelFunc是全局变量,题主的listener也是全局变量,全局变量如果不加锁保护,多个协程访问就会race。除非你的全局变量不会在不同协程里面同时读写。如果所有协程都读全局变量(永远不写时)也不会race的。你可以用go build -race开启竟态检查,如果有资源出现race会打印一些信息。

zzustu
zzustu · #7 · 2年之前
jan-barjan-bar #6 回复

#5楼 @zzustu `Accept()`源码里面没有任何地方用到`ctx`,所以你的`ctx`影响不到`Accept()`。你的`cancelFunc `是全局变量,题主的`listener `也是全局变量,全局变量如果不加锁保护,多个协程访问就会race。除非你的全局变量不会在不同协程里面同时读写。如果所有协程都读全局变量(永远不写时)也不会race的。你可以用`go build -race`开启竟态检查,如果有资源出现race会打印一些信息。

确实,试了下,Listener 的 ctx 影响不到 Accept()

xwszt
xwszt · #8 · 2年之前

问一下楼主:为什么要把监听器作为全局变量?还要在另外一个地方去做Close?这样做的理由是什么

visli
visli · #9 · 2年之前

@xwszt 其实最初没把Listener做成全局变量,也不存在争用问题。前天在做代码审查时,发现一个专门处理服务关闭事件的协程,基本是空闲的,而项目里有许多Listener,就想把这些Listener都统一到这个协程里来作通知关闭管理,所以才改成全局变量的。

xwszt
xwszt · #10 · 2年之前

如果是这样的话,应该使用channel接收关闭通知,而不是把Listener作为全局变量

visli
visli · #11 · 2年之前

@xwszt</a> , 最初是用channel来通知关闭的。外部goroutine close掉这个channel来通知。但这样需要在创建listener的协程中另起一个协程来监听这个channel。多个listener就起了多个监听的协程,感觉浪费。

大致代码如下:

func NewSrv() {
  ln, err := net.ListenTCP()
  defer ln.Close()
  ...
  go func() {
      <- exitChann
      ln.Close()
  }

  for {
    c, err:= ln.Accept()
    ....
  }

}

现在改成了1楼的建议:

func NewSrv() {
  ln, err := net.ListenTCP()
  defer ln.Close()
  ...

  for {
    select {
      case <- exitChann: return
      default:
           ln.SetDeadlin()
           c, err := ln.Accept()
           ....

    }
  }

}
```" name="content" class="comment-textarea" rows="8" style="width: 100%;">@xwszt , 最初是用channel来通知关闭的。外部goroutine close掉这个channel来通知。但这样需要在创建listener的协程中另起一个协程来监听这个channel。多个listener就起了多个监听的协程,感觉浪费。

大致代码如下:

func NewSrv() { ln, err := net.ListenTCP() defer ln.Close() ... go func() { <- exitChann ln.Close() }

for { c, err:= ln.Accept() .... }

}


现在改成了1楼的建议:

func NewSrv() { ln, err := net.ListenTCP() defer ln.Close() ...

for { select { case <- exitChann: return default: ln.SetDeadlin() c, err := ln.Accept() ....

}

}

} ```

问题的关键是listener非线程安全的,但又不好用加锁等方法来做同步

xwszt
xwszt · #12 · 2年之前

11楼 @visli

func NewSrv() {
   go func(){
        ln, err := net.ListenTCP()
        defer ln.Close()
        ...

        for {
            c, err:= ln.Accept()
             ....
        }
   }
  <- exitChan
}

主程运行结束,子协程也会结束。

buscoop
buscoop · #13 · 2年之前

那最终方案是什么

visli
visli · #14 · 2年之前

@buscoop 最终方案就是1楼的办法。 另外,我最初的这个方案,似乎也不会报race问题,这是令我困惑的。

func NewSrv() {
  ln, err := net.ListenTCP()
  defer ln.Close()
  ...
  go func() {
      <- exitChann
      ln.Close()
  }

  for {
    c, err:= ln.Accept()
    ....
  }

}
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传