go 客户端服务端通信

client.go

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "hash/crc32"
    "math/rand"
    "net"
    "os"
    // "sync"
    "time"
)

//数据包类型
const (
    HEART_BEAT_PACKET = 0x00
    REPORT_PACKET     = 0x01
)

//默认的服务器地址
var (
    server = "127.0.0.1:8080"
)

//数据包
type Packet struct {
    PacketType    byte
    PacketContent []byte
}

//心跳包
type HeartPacket struct {
    Version   string `json:"version"`
    Timestamp int64  `json:"timestamp"`
}

//数据包
type ReportPacket struct {
    Content   string `json:"content"`
    Rand      int    `json:"rand"`
    Timestamp int64  `json:"timestamp"`
}

//注册
type RegisterReq struct {
    PERAESKey  string `json:"PERAESKey"`
    VIN        string `json:"VIN"`
    T_Box_SN   string `json:"T_Box_SN"`
    IMSI       string `json:"IMSI"`
    rollNumber string `json:"rollNumber"`
}

//客户端对象
type TcpClient struct {
    connection *net.TCPConn
    hawkServer *net.TCPAddr
    stopChan   chan struct{}
}

func main() {
    //拿到服务器地址信息
    hawkServer, err := net.ResolveTCPAddr("tcp", server)
    if err != nil {
        fmt.Printf("hawk server [%s] resolve error: [%s]", server, err.Error())
        os.Exit(1)
    }
    //连接服务器
    connection, err := net.DialTCP("tcp", nil, hawkServer)
    if err != nil {
        fmt.Printf("connect to hawk server error: [%s]", err.Error())
        os.Exit(1)
    }
    client := &TcpClient{
        connection: connection,
        hawkServer: hawkServer,
        stopChan:   make(chan struct{}),
    }
    //启动接收
    go client.receivePackets()

    //发送心跳的goroutine
    /*go func() {
        heartBeatTick := time.Tick(2 * time.Second)
        for {
            select {
            case <-heartBeatTick:
                client.sendHeartPacket()
            case <-client.stopChan:
                return
            }
        }
    }()*/

    //测试用的,开300个goroutine每秒发送一个包
    // for i := 0; i < 1; i++ {
    go func() {
        sendTimer := time.After(5 * time.Second)
        for {
            select {
            case <-sendTimer:
                client.sendReportPacket()
                sendTimer = time.After(1 * time.Second)
            case <-client.stopChan:
                return
            }
        }
    }()
    // }
    //等待退出
    <-client.stopChan
}

// 接收数据包
func (client *TcpClient) receivePackets() {
    reader := bufio.NewReader(client.connection)
    for {
        //承接上面说的服务器端的偷懒,我这里读也只是以\n为界限来读区分包
        msg, err := reader.ReadString('\n')
        if err != nil {
            //在这里也请处理如果服务器关闭时的异常
            close(client.stopChan)
            break
        }
        fmt.Print(msg)
    }
}

//发送数据包
//仔细看代码其实这里做了两次json的序列化,有一次其实是不需要的
func (client *TcpClient) sendReportPacket() {
    registPacket := RegisterReq{
        PERAESKey:  "123456",
        VIN:        "abcdef",
        T_Box_SN:   "abcdef123456",
        IMSI:       "IMSI",
        rollNumber: getRandString(),
        /*Content:   getRandString(),
        Timestamp: time.Now().Unix(),
        Rand:      rand.Int(),*/
    }
    fmt.Println("registPacket:", registPacket)
    packetBytes, err := json.Marshal(registPacket) //返回值是字节数组byte[],
    if err != nil {
        fmt.Println(err.Error())
    }
    //这一次其实可以不需要,在封包的地方把类型和数据传进去即可
    /*packet := Packet{
        PacketType:    REPORT_PACKET,
        PacketContent: packetBytes,
    }
    sendBytes, err := json.Marshal(packet)
    if err != nil {
        fmt.Println(err.Error())
    }*/
    //发送

    client.connection.Write(EnPackSendData(packetBytes))
    // fmt.Println("EnPackSendData(packetBytes):%v", EnPackSendData(packetBytes))
    // fmt.Println("Send metric data success!")
}

//使用的协议与服务器端保持一致
func EnPackSendData(sendBytes []byte) []byte {
    packetLength := len(sendBytes) + 8
    result := make([]byte, packetLength)
    result[0] = 0xFF
    result[1] = 0xFF
    result[2] = byte(uint16(len(sendBytes)) >> 8) //除以2的8次方,byte是0-255,
    result[3] = byte(uint16(len(sendBytes)) & 0xFF)
    copy(result[4:], sendBytes)
    sendCrc := crc32.ChecksumIEEE(sendBytes)
    result[packetLength-4] = byte(sendCrc >> 24)
    result[packetLength-3] = byte(sendCrc >> 16 & 0xFF)
    result[packetLength-2] = 0xFF
    result[packetLength-1] = 0xFE
    fmt.Println(result)
    return result
}

//发送心跳包,与发送数据包一样
func (client *TcpClient) sendHeartPacket() {
    heartPacket := HeartPacket{
        Version:   "1.0",
        Timestamp: time.Now().Unix(),
    }
    packetBytes, err := json.Marshal(heartPacket)
    if err != nil {
        fmt.Println(err.Error())
    }
    packet := Packet{
        PacketType:    HEART_BEAT_PACKET,
        PacketContent: packetBytes,
    }
    sendBytes, err := json.Marshal(packet)
    if err != nil {
        fmt.Println(err.Error())
    }
    client.connection.Write(EnPackSendData(sendBytes))
    fmt.Println("Send heartbeat data success!")
}

//拿一串随机字符
func getRandString() string {
    // length := rand.Intn(10)
    strBytes := make([]byte, 10)
    for i := 0; i < 10; i++ {
        strBytes[i] = byte(rand.Intn(26) + 97)
    }
    return string(strBytes)
}

/*作者:getyouyou
链接:https://www.jianshu.com/p/dbc62a879081
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。*/

server.go

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "hash/crc32"
    "io"
    "net"
    "os"
)

//数据包的类型
const (
    HEART_BEAT_PACKET = 0x00
    REPORT_PACKET     = 0x01
)

var (
    server = "127.0.0.1:8080"
)

//这里是包的结构体,其实是可以不需要的
type Packet struct {
    PacketType    byte
    PacketContent []byte
}

//注册
type RegisterReq struct {
    PERAESKey  string `json:"PERAESKey"`
    VIN        string `json:"VIN"`
    T_Box_SN   string `json:"T_Box_SN"`
    IMSI       string `json:"IMSI"`
    rollNumber string `json:"rollNumber"`
}

//心跳包,这里用了json来序列化,也可以用github上的gogo/protobuf包
//具体见(https://github.com/gogo/protobuf)
type HeartPacket struct {
    Version   string `json:"version"`
    Timestamp int64  `json:"timestamp"`
}

//正式上传的数据包
type ReportPacket struct {
    Content   string `json:"content"`
    Rand      int    `json:"rand"`
    Timestamp int64  `json:"timestamp"`
}

//与服务器相关的资源都放在这里面
type TcpServer struct {
    listener   *net.TCPListener
    hawkServer *net.TCPAddr
}

func main() {
    //类似于初始化套接字,绑定端口
    hawkServer, err := net.ResolveTCPAddr("tcp", server)
    checkErr(err)
    //侦听
    listen, err := net.ListenTCP("tcp", hawkServer)
    checkErr(err)
    //记得关闭
    defer listen.Close()
    tcpServer := &TcpServer{
        listener:   listen,
        hawkServer: hawkServer,
    }
    fmt.Println("start server successful......")
    //开始接收请求
    for {
        conn, err := tcpServer.listener.Accept()
        fmt.Println("accept tcp client %s", conn.RemoteAddr().String())
        checkErr(err)
        // 每次建立一个连接就放到单独的协程内做处理
        go Handle(conn)
    }
}

//处理函数,这是一个状态机
//根据数据包来做解析
//数据包的格式为|0xFF|0xFF|len(高)|len(低)|Data|CRC高16位|0xFF|0xFE
//其中len为data的长度,实际长度为len(高)*256+len(低)
//CRC为32位CRC,取了最高16位共2Bytes
//0xFF|0xFF和0xFF|0xFE类似于前导码
func Handle(conn net.Conn) {
    // close connection before exit
    defer conn.Close()
    // 状态机状态
    state := 0x00
    // 数据包长度
    length := uint16(0)
    // crc校验和
    crc16 := uint16(0)
    var recvBuffer []byte
    // 游标
    cursor := uint16(0)
    bufferReader := bufio.NewReader(conn)
    //状态机处理数据
    for {
        recvByte, err := bufferReader.ReadByte() //recvByte是每次读到的字节
        if err != nil {
            //这里因为做了心跳,所以就没有加deadline时间,如果客户端断开连接
            //这里ReadByte方法返回一个io.EOF的错误,具体可考虑文档
            /*Handle方法在一个死循环中使用了一个无阻塞的buff来读取套接字中的数据,
              因此当客户端主动关闭连接时,如果不对这个io.EOF进行处理,会导致这个goroutine空转,
              疯狂吃cpu,在这里io.EOF的处理非常重要:)*/
            if err == io.EOF {
                fmt.Printf("client %s is close!\n", conn.RemoteAddr().String())
            }
            //在这里直接退出goroutine,关闭由defer操作完成
            return
        }
        //进入状态机,根据不同的状态来处理
        switch state {
        case 0x00:
            if recvByte == 0xFF {
                state = 0x01
                //初始化状态机
                recvBuffer = nil
                length = 0
                crc16 = 0
            } else {
                state = 0x00
            }
            break
        case 0x01:
            if recvByte == 0xFF {
                state = 0x02
            } else {
                state = 0x00
            }
            break
        case 0x02:
            length += uint16(recvByte) * 256      //length这次是发送数据的长度
            fmt.Println("0x02,length:%d", length) //0
            state = 0x03
            break
        case 0x03:
            length += uint16(recvByte)
            fmt.Println("0x03,length:%d", length) //77
            // 一次申请缓存,初始化游标,准备读数据
            recvBuffer = make([]byte, length)
            cursor = 0
            state = 0x04
            break
        case 0x04:
            //不断地在这个状态下读数据,直到满足长度为止
            recvBuffer[cursor] = recvByte
            cursor++
            if cursor == length {
                state = 0x05
            }
            break
        case 0x05:
            crc16 += uint16(recvByte) * 256 //crc32编码
            state = 0x06
            break
        case 0x06:
            crc16 += uint16(recvByte)
            state = 0x07
            break
        case 0x07:
            if recvByte == 0xFF {
                state = 0x08
            } else {
                state = 0x00
            }
        case 0x08:
            if recvByte == 0xFE {
                //执行数据包校验
                if (crc32.ChecksumIEEE(recvBuffer)>>16)&0xFFFF == uint32(crc16) {
                    var packet RegisterReq
                    //把拿到的数据反序列化出来
                    json.Unmarshal(recvBuffer, &packet)
                    //新开协程处理数据
                    go processRecvData(&packet, conn)
                } else {
                    fmt.Println("丢弃数据!")
                }
            }
            //状态机归位,接收下一个包
            state = 0x00
        }
    }
}

//在这里处理收到的包,就和一般的逻辑一样了,根据类型进行不同的处理,因人而异
//我这里处理了心跳和一个上报数据包
//服务器往客户端的数据包很简单地以\n换行结束了,偷了一个懒:),正常情况下也可根据自己的协议来封装好
//然后在客户端写一个状态来处理
func processRecvData(packet *RegisterReq, conn net.Conn) {
    // switch packet.PacketType {
    // case HEART_BEAT_PACKET:
    // var beatPacket HeartPacket
    // json.Unmarshal(packet.PacketContent, &beatPacket)
    fmt.Printf("recieve heat beat from [%s] ,data is [%v]\n", conn.RemoteAddr().String(), packet)
    conn.Write([]byte("heartBeat\n"))
    return
    // case REPORT_PACKET:
    //     var reportPacket ReportPacket
    //     json.Unmarshal(packet.PacketContent, &reportPacket)
    //     fmt.Printf("recieve report data from [%s] ,data is [%v]\n", conn.RemoteAddr().String(), reportPacket)
    //     conn.Write([]byte("Report data has recive\n"))
    //     return
    // }
}

//处理错误,根据实际情况选择这样处理,还是在函数调之后不同的地方不同处理
func checkErr(err error) {
    if err != nil {
        fmt.Println(err)
        os.Exit(-1)
    }
}

/*作者:getyouyou
链接:https://www.jianshu.com/p/dbc62a879081
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。*/