DELPHI高性能大容量SOCKET并发,四:粘包、分包、解包

DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包

粘包


使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。

粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。


分包


在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。

应用层数据包格式如下:



应用层数据包格式   
数据包长度Len:Cardinal(4字节无符号整数) 数据包内容,长度为Len 
IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:




[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord;  
02.  const ACount: Cardinal);  
03.begin  
04.  case AIocpRecord.IocpOperate of  
05.    ioNone: Exit;  
06.    ioRead: //收到数据  
07.    begin  
08.      FActiveTime := Now;  
09.      ReceiveData(AIocpRecord.WsaBuf.buf, ACount);  
10.      if FConnected then  
11.        PreRecv(AIocpRecord); //投递请求  
12.    end;  
13.    ioWrite: //发送数据完成,需要释放AIocpRecord的指针  
14.    begin  
15.      FActiveTime := Now;  
16.      FSendOverlapped.Release(AIocpRecord);  
17.    end;  
18.    ioStream:  
19.    begin  
20.      FActiveTime := Now;  
21.      FSendOverlapped.Release(AIocpRecord);  
22.      WriteStream; //继续发送流  
23.    end;  
24.  end;  
25.end;  

如果是收到数据,则调用ReceiveData函数,ReceiveData主要功能是把数据的写入流中,然后调用Process分包。FInputBuf是一个内存流(FInputBuf: TMemoryStream),内存流的每次写入会造成一次内存分配,如果要获得更高的效率,可以替换为内存池等更好的内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这样就可以每次创建对象时一次分配好,减少内存分配次数,提高效率。(内存的分配和释放比内存的读写效率要低) 




[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal);  
02.begin  
03.  FInputBuf.Write(AData^, ALen);  
04.  Process;  
05.end;  

Process则根据收到的数据进行分包逻辑,如果不够一个包,则继续等待接收数据,如果够一个或多个包,则循环调用Execute函数进行处理,代码如下: 




[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSocketHandle.Process;  
02.var  
03.  AData, ALast, NewBuf: PByte;  
04.  iLenOffset, iOffset, iReserveLen: Integer;  
05.  
06.  function ReadLen: Integer;  
07.  var  
08.    wLen: Word;  
09.    cLen: Cardinal;  
10.  begin  
11.    FInputBuf.Position := iOffset;  
12.    if FLenType = ltWord then  
13.    begin  
14.      FInputBuf.Read(wLen, SizeOf(wLen));  
15.      //wLen := ntohs(wLen);  
16.      Result := wLen;  
17.    end  
18.    else  
19.    begin  
20.      FInputBuf.Read(cLen, SizeOf(cLen));  
21.      //cLen := ntohl(cLen);  
22.      Result := cLen;  
23.    end;  
24.  end;  
25.begin  
26.  case FLenType of  
27.    ltWord, ltCardinal:  
28.    begin  
29.      if FLenType = ltWord then  
30.        iLenOffset := 2  
31.      else  
32.        iLenOffset := 4;  
33.      iReserveLen := 0;  
34.      FPacketLen := 0;  
35.      iOffset := 0;  
36.      if FPacketLen <= 0 then  
37.      begin  
38.        if FInputBuf.Size < iLenOffset then Exit;  
39.        FInputBuf.Position := 0; //移动到最前面  
40.        FPacketLen := ReadLen;  
41.        iOffset := iLenOffset;  
42.        iReserveLen := FInputBuf.Size - iOffset;  
43.        if FPacketLen > iReserveLen then //不够一个包的长度  
44.        begin  
45.          FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据  
46.          FPacketLen := 0;  
47.          Exit;  
48.        end;  
49.      end;  
50.      while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理  
51.      begin //多个包循环处理  
52.        AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针  
53.        Execute(AData, FPacketLen);  
54.        iOffset := iOffset + FPacketLen; //移到下一个点  
55.        FPacketLen := 0;  
56.        iReserveLen := FInputBuf.Size - iOffset;  
57.        if iReserveLen > iLenOffset then //剩下的数据  
58.        begin  
59.          FPacketLen := ReadLen;  
60.          iOffset := iOffset + iLenOffset;  
61.          iReserveLen := FInputBuf.Size - iOffset;  
62.          if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退  
63.          begin  
64.            iOffset := iOffset - iLenOffset;  
65.            iReserveLen := FInputBuf.Size - iOffset;  
66.            FPacketLen := 0;  
67.          end;  
68.        end  
69.        else //不够长度字节数  
70.          FPacketLen := 0;  
71.      end;  
72.      if iReserveLen > 0 then //把剩下的自己缓存起来  
73.      begin  
74.        ALast := Pointer(Longint(FInputBuf.Memory) + iOffset);  
75.        GetMem(NewBuf, iReserveLen);  
76.        try  
77.          CopyMemory(NewBuf, ALast, iReserveLen);  
78.          FInputBuf.Clear;  
79.          FInputBuf.Write(NewBuf^, iReserveLen);  
80.        finally  
81.          FreeMemory(NewBuf);  
82.        end;  
83.      end  
84.      else  
85.      begin  
86.        FInputBuf.Clear;  
87.      end;  
88.    end;  
89.  else  
90.    begin  
91.      FInputBuf.Position := 0;  
92.      AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针  
93.      Execute(AData, FInputBuf.Size);  
94.      FInputBuf.Clear;  
95.    end;  
96.  end;  
97.end;  



解包


由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:



命令长度Len:Cardinal(4字节无符号整数) 命令 数据 
这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:




[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.function TBaseSocket.DecodePacket(APacketData: PByte;  
02.  const ALen: Integer): Boolean;  
03.var  
04.  CommandLen: Integer;  
05.  UTF8Command: UTF8String;  
06.begin  
07.  if ALen > 4 then //命令长度为4字节,因而长度必须大于4  
08.  begin  
09.    CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度  
10.    Inc(APacketData, SizeOf(Cardinal));  
11.    SetLength(UTF8Command, CommandLen);  
12.    CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令  
13.    Inc(APacketData, CommandLen);  
14.    FRequestData := APacketData; //数据  
15.    FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度  
16.    FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi  
17.    Result := True;  
18.  end  
19.  else  
20.    Result := False;   
21.end;  

具体每个协议可以集成Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,例如TSQLSocket主要代码如下: 




[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.{* SQL查询SOCKET基类 *}  
02.TSQLSocket = class(TBaseSocket)  
03.private  
04.  {* 开始事务创建TADOConnection,关闭事务时释放 *}  
05.  FBeginTrans: Boolean;  
06.  FADOConn: TADOConnection;  
07.protected  
08.  {* 处理数据接口 *}  
09.  procedure Execute(AData: PByte; const ALen: Cardinal); override;  
10.  {* 返回SQL语句执行结果 *}  
11.  procedure DoCmdSQLOpen;  
12.  {* 执行SQL语句 *}  
13.  procedure DoCmdSQLExec;  
14.  {* 开始事务 *}  
15.  procedure DoCmdBeginTrans;  
16.  {* 提交事务 *}  
17.  procedure DoCmdCommitTrans;  
18.  {* 回滚事务 *}  
19.  procedure DoCmdRollbackTrans;  
20.public  
21.  procedure DoCreate; override;  
22.  destructor Destroy; override;  
23.  {* 获取SQL语句 *}  
24.  function GetSQL: string;  
25.  property BeginTrans: Boolean read FBeginTrans;  
26.end;  

Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下: 




[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal);  
02.var  
03.  sErr: string;  
04.begin  
05.  inherited;  
06.  FRequest.Clear;  
07.  FResponse.Clear;  
08.  try  
09.    AddResponseHeader;  
10.    if ALen = 0 then  
11.    begin  
12.      DoFailure(CIPackLenError);  
13.      DoSendResult;  
14.      Exit;  
15.    end;  
16.    if DecodePacket(AData, ALen) then  
17.    begin  
18.      FResponse.Clear;  
19.      AddResponseHeader;  
20.      case StrToSQLCommand(Command) of  
21.        scLogin:  
22.        begin  
23.          DoCmdLogin;  
24.          DoSendResult;  
25.        end;  
26.        scActive:  
27.        begin  
28.          DoSuccess;  
29.          DoSendResult;  
30.        end;  
31.        scSQLOpen:  
32.        begin  
33.          DoCmdSQLOpen;  
34.        end;  
35.        scSQLExec:  
36.        begin  
37.          DoCmdSQLExec;  
38.          DoSendResult;  
39.        end;  
40.        scBeginTrans:  
41.        begin  
42.          DoCmdBeginTrans;  
43.          DoSendResult;  
44.        end;  
45.        scCommitTrans:  
46.        begin  
47.          DoCmdCommitTrans;  
48.          DoSendResult;  
49.        end;  
50.        scRollbackTrans:  
51.        begin  
52.          DoCmdRollbackTrans;  
53.          DoSendResult;  
54.        end;  
55.      else  
56.        DoFailure(CINoExistCommand, 'Unknow Command');  
57.        DoSendResult;  
58.      end;  
59.    end  
60.    else  
61.    begin  
62.      DoFailure(CIPackFormatError, 'Packet Must Include \r\n\r\n');  
63.      DoSendResult;  
64.    end;  
65.  except  
66.    on E: Exception do //发生未知错误,断开连接  
67.    begin  
68.      sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message;  
69.      WriteLogMsg(ltError, sErr);  
70.      Disconnect;  
71.    end;  
72.  end;  
73.end;  



更详细代码见示例代码的IOCPSocket单元。


V1版下载地址:http://download.csdn.net/detail/sqldebug_fan/4510076,需要资源10分,有稳定性问题,可以作为研究稳定性用;
V2版下载地址:http://download.csdn.net/detail/sqldebug_fan/5560185,不需要资源分,解决了稳定性问题和提高性能;免责声明:此代码只是为了演示IOCP编程,仅用于学习和研究,切勿用于商业用途。水平有限,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。