DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包
粘包
使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。
粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。
分包
在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。
应用层数据包格式如下:
应用层数据包格式
数据包长度Len:Cardinal(4字节无符号整数) 数据包内容,长度为Len
IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:
[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord;
02. const ACount: Cardinal);
03.begin
04. case AIocpRecord.IocpOperate of
05. ioNone: Exit;
06. ioRead: //收到数据
07. begin
08. FActiveTime := Now;
09. ReceiveData(AIocpRecord.WsaBuf.buf, ACount);
10. if FConnected then
11. PreRecv(AIocpRecord); //投递请求
12. end;
13. ioWrite: //发送数据完成,需要释放AIocpRecord的指针
14. begin
15. FActiveTime := Now;
16. FSendOverlapped.Release(AIocpRecord);
17. end;
18. ioStream:
19. begin
20. FActiveTime := Now;
21. FSendOverlapped.Release(AIocpRecord);
22. WriteStream; //继续发送流
23. end;
24. end;
25.end;
如果是收到数据,则调用ReceiveData函数,ReceiveData主要功能是把数据的写入流中,然后调用Process分包。FInputBuf是一个内存流(FInputBuf: TMemoryStream),内存流的每次写入会造成一次内存分配,如果要获得更高的效率,可以替换为内存池等更好的内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这样就可以每次创建对象时一次分配好,减少内存分配次数,提高效率。(内存的分配和释放比内存的读写效率要低)
[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal);
02.begin
03. FInputBuf.Write(AData^, ALen);
04. Process;
05.end;
Process则根据收到的数据进行分包逻辑,如果不够一个包,则继续等待接收数据,如果够一个或多个包,则循环调用Execute函数进行处理,代码如下:
[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSocketHandle.Process;
02.var
03. AData, ALast, NewBuf: PByte;
04. iLenOffset, iOffset, iReserveLen: Integer;
05.
06. function ReadLen: Integer;
07. var
08. wLen: Word;
09. cLen: Cardinal;
10. begin
11. FInputBuf.Position := iOffset;
12. if FLenType = ltWord then
13. begin
14. FInputBuf.Read(wLen, SizeOf(wLen));
15. //wLen := ntohs(wLen);
16. Result := wLen;
17. end
18. else
19. begin
20. FInputBuf.Read(cLen, SizeOf(cLen));
21. //cLen := ntohl(cLen);
22. Result := cLen;
23. end;
24. end;
25.begin
26. case FLenType of
27. ltWord, ltCardinal:
28. begin
29. if FLenType = ltWord then
30. iLenOffset := 2
31. else
32. iLenOffset := 4;
33. iReserveLen := 0;
34. FPacketLen := 0;
35. iOffset := 0;
36. if FPacketLen <= 0 then
37. begin
38. if FInputBuf.Size < iLenOffset then Exit;
39. FInputBuf.Position := 0; //移动到最前面
40. FPacketLen := ReadLen;
41. iOffset := iLenOffset;
42. iReserveLen := FInputBuf.Size - iOffset;
43. if FPacketLen > iReserveLen then //不够一个包的长度
44. begin
45. FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据
46. FPacketLen := 0;
47. Exit;
48. end;
49. end;
50. while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理
51. begin //多个包循环处理
52. AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针
53. Execute(AData, FPacketLen);
54. iOffset := iOffset + FPacketLen; //移到下一个点
55. FPacketLen := 0;
56. iReserveLen := FInputBuf.Size - iOffset;
57. if iReserveLen > iLenOffset then //剩下的数据
58. begin
59. FPacketLen := ReadLen;
60. iOffset := iOffset + iLenOffset;
61. iReserveLen := FInputBuf.Size - iOffset;
62. if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退
63. begin
64. iOffset := iOffset - iLenOffset;
65. iReserveLen := FInputBuf.Size - iOffset;
66. FPacketLen := 0;
67. end;
68. end
69. else //不够长度字节数
70. FPacketLen := 0;
71. end;
72. if iReserveLen > 0 then //把剩下的自己缓存起来
73. begin
74. ALast := Pointer(Longint(FInputBuf.Memory) + iOffset);
75. GetMem(NewBuf, iReserveLen);
76. try
77. CopyMemory(NewBuf, ALast, iReserveLen);
78. FInputBuf.Clear;
79. FInputBuf.Write(NewBuf^, iReserveLen);
80. finally
81. FreeMemory(NewBuf);
82. end;
83. end
84. else
85. begin
86. FInputBuf.Clear;
87. end;
88. end;
89. else
90. begin
91. FInputBuf.Position := 0;
92. AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针
93. Execute(AData, FInputBuf.Size);
94. FInputBuf.Clear;
95. end;
96. end;
97.end;
解包
由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:
命令长度Len:Cardinal(4字节无符号整数) 命令 数据
这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:
[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.function TBaseSocket.DecodePacket(APacketData: PByte;
02. const ALen: Integer): Boolean;
03.var
04. CommandLen: Integer;
05. UTF8Command: UTF8String;
06.begin
07. if ALen > 4 then //命令长度为4字节,因而长度必须大于4
08. begin
09. CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度
10. Inc(APacketData, SizeOf(Cardinal));
11. SetLength(UTF8Command, CommandLen);
12. CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令
13. Inc(APacketData, CommandLen);
14. FRequestData := APacketData; //数据
15. FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度
16. FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi
17. Result := True;
18. end
19. else
20. Result := False;
21.end;
具体每个协议可以集成Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,例如TSQLSocket主要代码如下:
[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.{* SQL查询SOCKET基类 *}
02.TSQLSocket = class(TBaseSocket)
03.private
04. {* 开始事务创建TADOConnection,关闭事务时释放 *}
05. FBeginTrans: Boolean;
06. FADOConn: TADOConnection;
07.protected
08. {* 处理数据接口 *}
09. procedure Execute(AData: PByte; const ALen: Cardinal); override;
10. {* 返回SQL语句执行结果 *}
11. procedure DoCmdSQLOpen;
12. {* 执行SQL语句 *}
13. procedure DoCmdSQLExec;
14. {* 开始事务 *}
15. procedure DoCmdBeginTrans;
16. {* 提交事务 *}
17. procedure DoCmdCommitTrans;
18. {* 回滚事务 *}
19. procedure DoCmdRollbackTrans;
20.public
21. procedure DoCreate; override;
22. destructor Destroy; override;
23. {* 获取SQL语句 *}
24. function GetSQL: string;
25. property BeginTrans: Boolean read FBeginTrans;
26.end;
Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下:
[delphi] view plaincopy在CODE上查看代码片派生到我的代码片
01.procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal);
02.var
03. sErr: string;
04.begin
05. inherited;
06. FRequest.Clear;
07. FResponse.Clear;
08. try
09. AddResponseHeader;
10. if ALen = 0 then
11. begin
12. DoFailure(CIPackLenError);
13. DoSendResult;
14. Exit;
15. end;
16. if DecodePacket(AData, ALen) then
17. begin
18. FResponse.Clear;
19. AddResponseHeader;
20. case StrToSQLCommand(Command) of
21. scLogin:
22. begin
23. DoCmdLogin;
24. DoSendResult;
25. end;
26. scActive:
27. begin
28. DoSuccess;
29. DoSendResult;
30. end;
31. scSQLOpen:
32. begin
33. DoCmdSQLOpen;
34. end;
35. scSQLExec:
36. begin
37. DoCmdSQLExec;
38. DoSendResult;
39. end;
40. scBeginTrans:
41. begin
42. DoCmdBeginTrans;
43. DoSendResult;
44. end;
45. scCommitTrans:
46. begin
47. DoCmdCommitTrans;
48. DoSendResult;
49. end;
50. scRollbackTrans:
51. begin
52. DoCmdRollbackTrans;
53. DoSendResult;
54. end;
55. else
56. DoFailure(CINoExistCommand, 'Unknow Command');
57. DoSendResult;
58. end;
59. end
60. else
61. begin
62. DoFailure(CIPackFormatError, 'Packet Must Include \r\n\r\n');
63. DoSendResult;
64. end;
65. except
66. on E: Exception do //发生未知错误,断开连接
67. begin
68. sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message;
69. WriteLogMsg(ltError, sErr);
70. Disconnect;
71. end;
72. end;
73.end;
更详细代码见示例代码的IOCPSocket单元。
V1版下载地址:http://download.csdn.net/detail/sqldebug_fan/4510076,需要资源10分,有稳定性问题,可以作为研究稳定性用;
V2版下载地址:http://download.csdn.net/detail/sqldebug_fan/5560185,不需要资源分,解决了稳定性问题和提高性能;免责声明:此代码只是为了演示IOCP编程,仅用于学习和研究,切勿用于商业用途。水平有限,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。