Go实现基于WebSocket的弹幕服务

拉模式和推模式

拉模式

1、数据更新频率低,则大多数请求是无效的

2、在线用户量多,则服务端的查询负载高

3、定时轮询拉取,实时性低

推模式

1、仅在数据更新时才需要推送

2、需要维护大量的在线长连接

3、数据更新后可以立即推送

基于webSocket推送

1、浏览器支持的socket编程,轻松维持服务端长连接

2、基于TCP可靠传输之上的协议,无需开发者关心通讯细节

3、提供了高度抽象的编程接口,业务开发成本较低

webSocket协议与交互

通讯流程

客户端->upgrade->服务端

客户端<-switching<-服务端

客户端->message->服务端

客户端<-message<-服务端

实现http服务端

1、webSocket是http协议upgrade而来

2、使用http标准库快速实现空接口:/ws

webSocket握手

1、使用webSocket.Upgrader完成协议握手,得到webSocket长连接

2、操作webSocket api,读取客户端消息,然后原样发送回去

封装webSocket

缺乏工程化设计

1、其他代码模块,无法直接操作webSocket连接

2、webSocket连接非线程安全,并发读/写需要同步手段

隐藏细节,封装api

1、封装Connection结构,隐藏webSocket底层连接

2、封装Connection的api,提供Send/Read/Close等线程安全接口

api原理(channel是线程安全的)

1、SendMessage将消息投递到out channel

2、ReadMessage从in channel读取消息

内部原理

1、启动读协程,循环读取webSocket,将消息投递到in channel

2、启动写协程,循环读取out channel,将消息写给webSocket

// server.go
package main

import (
        "net/http"
        "github.com/gorilla/websocket"
        "./impl"
        "time"
)

var (
        upgrader = websocket.Upgrader{
                //允许跨域
                CheckOrigin: func(r *http.Request) bool {
                        return true
                },
        }
)

func wsHandler(w http.ResponseWriter, r *http.Request) {
        var (
                wsConn *websocket.Conn
                err error
                conn *impl.Connection
                data []byte
        )

        //Upgrade:websocket
        if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
                return
        }
        if conn, err = impl.InitConnection(wsConn); err != nil {
                goto ERR
        }

        go func() {
                var (
                        err error
                )
                for {
                        if err =conn.WriteMessage([]byte("heartbeat")); err != nil {
                                return
                        }
                        time.Sleep(1 * time.Second)
                }
        }()

        for {
                if data, err = conn.ReadMessage(); err != nil {
                        goto ERR
                }
                if err = conn.WriteMessage(data); err != nil {
                        goto ERR
                }

        }

        ERR:
                //关闭连接
                conn.Close()
}

func main() {
        //http:localhost:7777/ws
        http.HandleFunc("/ws", wsHandler)
        http.ListenAndServe("0.0.0.0:7777", nil)
}
// connection.go
package impl

import (
        "github.com/gorilla/websocket"
        "sync"
        "github.com/influxdata/platform/kit/errors"
)

var once sync.Once

type Connection struct {
        wsConn *websocket.Conn
        inChan chan []byte
        outChan chan []byte
        closeChan chan byte
        isClosed bool
        mutex sync.Mutex
}

func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
        conn = &Connection{
                wsConn:wsConn,
                inChan:make(chan []byte, 1000),
                outChan:make(chan []byte, 1000),
                closeChan:make(chan byte, 1),
        }

        //启动读协程
        go conn.readLoop()

        //启动写协程
        go conn.writeLoop()

        return
}

//API
func (conn *Connection) ReadMessage() (data []byte, err error) {
        select {
        case data = <- conn.inChan:
        case <- conn.closeChan:
                err = errors.New("connection is closed")
        }
        return
}

func (conn *Connection) WriteMessage(data []byte) (err error) {
        select {
        case conn.outChan <- data:
        case <- conn.closeChan:
                err = errors.New("connection is closed")
        }
        return
}

func (conn *Connection) Close() {
        // 线程安全的close,可重入
        conn.wsConn.Close()
        conn.mutex.Lock()
        if !conn.isClosed {
                close(conn.closeChan)
                conn.isClosed = true
        }
        conn.mutex.Unlock()
}

//内部实现
func (conn *Connection) readLoop() {
        var (
                data []byte
                err error
        )
        for {
                if _, data, err = conn.wsConn.ReadMessage(); err != nil {
                        goto ERR
                }

                //阻塞在这里,等待inChan有空位置
                //但是如果writeLoop连接关闭了,这边无法得知
                //conn.inChan <- data

                select {
                case conn.inChan <- data:
                case <-conn.closeChan:
                        //closeChan关闭的时候,会进入此分支
                        goto ERR
                }
        }
        ERR:
                conn.Close()
}

func (conn *Connection) writeLoop() {
        var (
                data []byte
                err error
        )
        for {
                select {
                case data = <- conn.outChan:
                case <- conn.closeChan:
                        goto ERR

                }

                if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
                        goto ERR
                }
                conn.outChan <- data
        }
        ERR:
                conn.Close()
}