go: 如何编写一个正确的udp服务端?

udp的服务端有一个大坑,即如果收包不及时,在系统缓冲写满后,将大量丢包。

在网上通常的示例中,一般在for循环中执行操作逻辑。这在生产环境将是一个隐患。是的,俺就翻车了。

go强大简易的并发能力可以用在处理udp数据上。

     PoolSizeUDP := 1472
        listener, err := net.ListenUDP("udp", &net.UDPAddr{
                IP:   net.ParseIP(listenIP),
                Port: port,
        })
        if err != nil {
                logrus.Fatalf("RunUdpServer failed to listen: %v", err)
                return nil
        }
        // 如果还不行,考虑把系统的buffer设大一点
        // listener.SetReadBuffer(1024 * 1024 * 8)
        // listener.SetWriteBuffer(1024 * 1024 * 8)
        var data = make([]byte, PoolSizeUDP)
        chLimit := make(chan int, 64) // 最多创建64个协程,避免内存爆炸
        for {
                select {
                case <-ctx.Done():
                        return nil
                default:
                }
                n, addr, err := listener.ReadFromUDP(data)
                if err != nil {
                        logrus.Errorf("RunUdpServer ReadFromUDP err: %v", err)
                        continue
                }
                raw := make([]byte, n) // 重点注意,每次循环都必须创建新的raw变量,否则踩内存
                copy(raw, data[:n])
                chLimit <- 1
                go func(udpMsg []byte) {
                        // 拿 udpMsg 做点什么
                        defer func() {
                                <-chLimit
                        }()
                        DoSth(udpMsg)
                }(raw)
        }

注意点:

  1. data可以在循环外创建,复用即可。每次ReadFromUDP并不会受到上次数据残留的影响。
  2. 不要在for中执行重逻辑,避免等待太久时间udp大量丢包。所以每次收到udpMsg,都交给go协程来处理。
  3. raw必须每次在循环内创建,否则在后面的go并发会踩内存。
  4. SetReadBuffer这个配置很有用

更新:上面的示例为了避免在后续的go中,有不可控的异步操作引用了数据导致踩内存,每次收消息都分配了新的[]byte

raw := make([]byte, n) // 重点注意,每次循环都必须创建新的raw变量,否则踩内存
copy(raw, data[:n])

经大佬提醒,这其实是一个不小的开销。如果go中执行的行为可控,引入sync.Pool可以很方便的做内存复用。

var udpBytesPool = sync.Pool{
        New: func() any {
                return make([]byte, PoolSizeUDP)
        },
}
......
        for {
                data := udpBytesPool.Get().([]byte)
                n, addr, err := listener.ReadFromUDP(data)
                if err != nil {
                        logrus.Errorf("RunUdpServer ReadFromUDP err: %v", err)
                        continue
                }
                go func(){
                        defer udpBytesPool.Put(data) // 注意,在协程退出执行这个操作时,一定确认 data不会再被引用了
                // do sth
                }()
        }