博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
高性能的 socket 通讯服务器(完成端口模型--IOCP)
阅读量:5790 次
发布时间:2019-06-18

本文共 31563 字,大约阅读时间需要 105 分钟。

hot3.png

很多人费尽心思,都没有找到一个完美的 I/O CP 例程,甚至跟人于误解,先将本人编写的例程公布出来,希望对那些苦苦寻觅的人带来收获。本例程可以作为初学者的学习之用,亦可以作为大型服务程序的通讯模块。其处理速度可以说,优化到了极点。如果理解了本例程的精髓,加上一个高效的通讯协议,你完全可以用它来构建一个高性能的通讯服务器。    在公布代码前,先谈谈I/O CP。对I/O CP的函数不多做说明了,网上很多,都一样。在此本人仅说一些技术上要注意的问题。    一、如何管理内存  1、IO数据缓冲管理  动态分配内存,是一种灵活的方式。但对于系统资源浪费是巨大的。因此本人采用的是预先分配服务器最大需要的内存,用链表来管理。任何时候分配交还都不需要遍历,仅需要互斥而已。  更巧妙的是,将IO发送信息和内存块有机的结合在一起,减少了链表的管理工作。    //IO操作标志  TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE);  //IO操作信息  PIOInfo =^ TIOInfo;  TIOInfo = packed record  Overlapped: TOverlapped; //重叠结构  DataBuf: TWSABUF; //IO数据信息  Socket: TSocket;  Flag: TIOFlag;  TickCountSend: DWord;  Next: PIOInfo;  Prior: PIOInfo;  end;    PUNode =^ TUNode;  TUNode = record  Next: Pointer;  end;    PIOMem =^ TIOMem;  TIOMem = packed record  IOInfo: TIOInfo;  Data: array[1..IO_MEM_SIZE] of Byte;  //申请内存的时候,返回的是Data的地址  end;    2、链路数据管理  采用双向链表结构,减少删除节点时遍历消耗的时间    //每个连接的信息  PLink =^ TLink;  TLink = record  Socket: TSocket;  RemoteIP: string[30];  RemotePort: DWord;  //最后收到数据时的系统节拍  TickCountActive: DWord;  //处理该连接的当前线程的信息  Worker: PWorker;  Data: Pointer; //应用层可以设置这个成员,当OnReceive的时候,就不要每次遍历每个连接对应的数据区了  Section: TRTLCriticalSection;  Next: PLink;  Prior: PLink;  end;    二、如何管理线程  每个工作线程创建的时候,调用:OnWorkerThreadCreateEvt,该函数可以返回这个线程对应的信息,比如为该线程创建的数据库连接控件或对应的类等,在OnReceive的可以从Link的Worker访问该成员Worker^.Data。    //工作线程信息  PWorker =^ TWorker;  TWorker = record  ID: THandle;  CompletionPort: THandle;  Data: Pointer; //调用OnWorkerThreadCreateEvt返回的值  //用于反应工作情况的数据  TickCountLong,  TickCountActive: DWord;  ExecCount: Integer;  //线程完成后设置  Finished: THandle;  Next: PWorker;  end;    同理,服务线程也是具有一样的特点。相见源码。    关于线程同步,一直是众多程序头疼的问题。在本例程中,尽量避免了过多的互斥,并有效地防止了死锁现象。用RTLCriticalSection,稍微不注意,就会造成死锁的灾难。哪怕是两行代码的差别,对多线程而言都是灾难的。在本例程中,对数据同步需要操作的是在维护链路链表方面上。服务线程需要计算哪个连接空闲超时了,工作线程需要处理断线情况,应用层主动发送数据时需要对该链路独占,否则一个在发送,一个在处理断线故障,就会发送冲突,导致灾难后果。    在本人的压力测试中,已经有效的解决了这个问题,应用层部分不需要做什么同步工作,可以安心的收发数据了。同时每个线程都支持了数据库连接。    三、到底要创建多少个工作线程合适  很多文章说,有N个CPU就创建N个线程,也有说N*2+2。最不喜欢说话不负责任的人了,本例程可以让刚入门 I/O CP 的人对它有更深入的了解。  例程测试结果:    四、该不该使用类  有人说,抛弃一切类,对于服务器而言,会为类付出很多代价,从我的观点看,为类付出代价的,主要是动态创建的原因。其实,类成员访问和结构成员访问一样,需要相对地址。如果都是预先创建的,两者没有多大的差别。本例程采用裸奔函数的方式,当然在应用层可以采用类来管理,很难想象,如果没有没有类,需要多做多少工作。    五、缺点  不能发大数据包,只能发不超过固定数的数据包。但对于小数据报而言,它将是优秀的。    时间原因,不能做太多的解释和对代码做太多的注释,需要例程源码的可以和本人联系,免费提供。QQ:48092788    例程源码:  http://d.download.csdn.net/down/1546336/guestcode    完成端口通讯服务模块源码:  {******************************************************************************  * UCode 系列组件、控件 *  * 作者:卢益贵 2003~2009 *  * 版权所有 任何未经授权的使用和销售,均保留追究法律责任的权力 *  * *  * UCode 系列由XCtrls-YCtrls-ICtrls-NCode系列演变而来 *  * QQ:48092788 luyigui.blog.gxsky.com *  ******************************************************************************}  {******************************************************************************  完成端口模型的socket服务器  ******************************************************************************}  unit UTcpServer;  interface  uses  Windows, Classes, UClasses, UWinSock2;  const  //每个IO缓冲区的大小  IO_MEM_SIZE = 2048;  //内存要足够用,可视情况设置  IO_MEM_MAX_COUNT = 1000 * 10;  //最大连接数  SOCK_MAX_COUNT = 3000;  //连接空闲实现,超过这个时间未收到客户端数据则关闭  SOCK_IDLE_OVERTIME = 60;  type  //工作线程信息  PWorker =^ TWorker;  TWorker = record  ID: THandle;  CompletionPort: THandle;  Data: Pointer;  //用于反应工作情况的数据  TickCountLong,  TickCountActive: DWord;  ExecCount: Integer;  //线程完成后设置  Finished: THandle;  Next: PWorker;  end;  //每个连接的信息  PLink =^ TLink;  TLink = record  Socket: TSocket;  RemoteIP: string[30];  RemotePort: DWord;  //最后收到数据时的系统节拍  TickCountActive: DWord;  //处理该连接的当前线程的信息  Worker: PWorker;  Data: Pointer;  Section: TRTLCriticalSection;  Next: PLink;  Prior: PLink;  end;  TOnLinkIdleOvertimeEvt = procedure(Link: PLink);  TOnDisconnectEvt = procedure(Link: PLink);  TOnReceiveEvt = function(Link: PLink; Buf: PByte; Len: Integer): Boolean;  TOnThreadCreateEvt = function(IsWorkerThread: Boolean): Pointer;  //取得链路链表使用情况X%  function GetLinkUse(): real;  //链路链表所占内存  function GetLinkSize(): Integer;  //当前链路数  function GetLinkCount(): Integer;  //空闲链路数  function GetLinkFree(): Integer;  //IO内存使用情况  function GetIOMemUse(): Real;  //IO内存链表占内存数  function GetIOMemSize(): Integer;  //IO内存空闲数  function GetIOMemFree(): Integer;  //交还一个IO内存  procedure FreeIOMem(Mem: Pointer);  //获取一个IO内存区  function GetIOMem(): Pointer;  //获取工作线程的工作情况  function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer;  //获取工作线程的ID  function GetWorkerID(Index: Integer): Integer;  //获取工作线程数量  function GetWorkerCount(): Integer;  //打开一个IP端口,并监听  function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean;  //停止并关闭一个IP端口  function StopTcpServer(): Boolean;  //设置响应事件的函数指针,在StartTcpServer之前调用  procedure SetEventProc(OnReceive: TOnReceiveEvt;  OnDisconnect: TOnDisconnectEvt;  OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt;  OnServerThreadCreate: TOnThreadCreateEvt;  OnWorkerThreadCreate: TOnThreadCreateEvt);  //写日志文件  procedure WriteLog(Log: String);  function PostRecv(Link: PLink; IOMem: Pointer): Boolean;  //抛出一个发送事件  function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean;  //广播数据到所有的链路对方  procedure PostBroadcast(Buf: PByte; Len: Integer);  //当前是否打开  function IsTcpServerActive(): Boolean;  //获取服务线程最后一次工作所占的时间(MS)  function GetServerExecLong(): DWord;  //获取服务线程工作次数  function GetServerExecCount(): Integer;  //获取本地或对外IP地址  function GetLocalIP(IsIntnetIP: Boolean): String;  implementation  uses  IniFiles, SysUtils, ActiveX;  var  ExePath: String = '';  const  HEAP_NO_SERIALIZE = 1; {非互斥, 此标记可允许多个线程同时访问此堆}  HEAP_GENERATE_EXCEPTIONS = 4; {当建立堆出错时, 此标记可激发一个异常并返回异常标识}  HEAP_ZERO_MEMORY = 8; {把分配的内存初始化为 0}  HEAP_REALLOC_IN_PLACE_ONLY = 16; {此标记不允许改变原来的内存位置}  STATUS_ACCESS_VIOLATION = DWORD($C0000005); {参数错误}  STATUS_NO_MEMORY = DWORD($C0000017); {内存不足}  {===============================================================================  IO内存管理  ================================================================================}  type  //IO操作标志  TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE);  //IO操作信息  PIOInfo =^ TIOInfo;  TIOInfo = packed record  Overlapped: TOverlapped; //重叠结构  DataBuf: TWSABUF; //IO数据信息  Socket: TSocket;  Flag: TIOFlag;  TickCountSend: DWord;  Next: PIOInfo;  Prior: PIOInfo;  end;    PUNode =^ TUNode;  TUNode = record  Next: Pointer;  end;    PIOMem =^ TIOMem;  TIOMem = packed record  IOInfo: TIOInfo;  Data: array[1..IO_MEM_SIZE] of Byte;  end;  var  IOMemHead: PIOMem = nil;  IOMemLast: PIOMem = nil;  IOMemUse: Integer = 0;  IOMemSec: TRTLCriticalSection;  IOMemList: array[1..IO_MEM_MAX_COUNT] of Pointer;  function GetIOMem(): Pointer;  begin  //内存要足够用,如果不够,即使是动态分配,神仙也救不了  EnterCriticalSection(IOMemSec);  try  try  Result := @(IOMemHead^.Data);  IOMemHead := PUNode(IOMemHead)^.Next;  IOMemUse := IOMemUse + 1;  except  Result := nil;  WriteLog('GetIOMem: error');  end;  finally  LeaveCriticalSection(IOMemSec);  end;  end;  procedure FreeIOMem(Mem: Pointer);  begin  EnterCriticalSection(IOMemSec);  try  try  Mem := Pointer(Integer(Mem) - sizeof(TIOInfo));  PUNode(Mem).Next := nil;  PUNode(IOMemLast)^.Next := Mem;  IOMemLast := Mem;  IOMemUse := IOMemUse - 1;  except  WriteLog('FreeIOMem: error');  end;  finally  LeaveCriticalSection(IOMemSec);  end;  end;  procedure IniIOMem();  var  i: Integer;  Heap: THandle;  begin  InitializeCriticalSection(IOMemSec);  IOMemHead := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TIOMem));  IOMemLast := IOMemHead;  IOMemList[1] := IOMemHead;  Heap := GetProcessHeap();  for i := 2 to IO_MEM_MAX_COUNT do  begin  PUNode(IOMemLast)^.Next := HeapAlloc(Heap, HEAP_ZERO_MEMORY, sizeof(TIOMem));  IOMemList[i] := PUNode(IOMemLast)^.Next;  IOMemLast := PUNode(IOMemLast)^.Next;  end;  PUNode(IOMemLast).Next := nil;  end;  function GetIOMemFree(): Integer;  var  IOMems: PUNode;  begin  EnterCriticalSection(IOMemSec);  Result := 0;  IOMems := PUNode(IOMemHead);  while IOMems nil do  begin  Result := Result + 1;  IOMems := IOMems^.Next;  end;  LeaveCriticalSection(IOMemSec);  end;  procedure DeleteIOMem();  var  i: Integer;  Heap: THandle;  begin  Heap := GetProcessHeap();  for i := 1 to IO_MEM_MAX_COUNT do  HeapFree(Heap, HEAP_NO_SERIALIZE, IOMemList[i]);  IOMemUse := 0;  DeleteCriticalSection(IOMemSec);  end;  function GetIOMemSize(): Integer;  begin  Result := IO_MEM_MAX_COUNT * sizeof(TIOMem);  end;  function GetIOMemUse(): Real;  begin  Result := (IOMemUse * 100) / IO_MEM_MAX_COUNT;  end;  {===============================================================================  Socket链路管理  ================================================================================}  procedure OnLinkIdleOvertimeDef(Link: PLink);  begin  end;  var  LinkHead: PLink = nil;  LinkLast: PLink = nil;  LinkUse: Integer = 0;  LinkCount: Integer = 0;  LinkSec: TRTLCriticalSection;  LinkList: array[1..SOCK_MAX_COUNT] of PLink;  OnLinkIdleOvertimeEvt: TOnLinkIdleOvertimeEvt = OnLinkIdleOvertimeDef;  LinksHead: PLink = nil;  LinksLast: PLink = nil;  function GetLinkFree(): Integer;  var  Links: PLink;  begin  EnterCriticalSection(LinkSec);  Result := 0;  Links := LinkHead;  while Links nil do  begin  Result := Result + 1;  Links := Links^.Next;  end;  LeaveCriticalSection(LinkSec);  end;  function GetLink(): PLink;  begin  try  //内存要足够用,如果不够,即使是动态分配,神仙也救不了  Result := LinkHead;  LinkHead := LinkHead^.Next;  LinkUse := LinkUse + 1;  LinkCount := LinkCount + 1;  if LinksHead = nil then  begin  LinksHead := Result;  LinksHead^.Next := nil;  LinksHead^.Prior := nil;  LinksLast := LinksHead;  end else  begin  Result^.Prior := LinksLast;  LinksLast^.Next := Result;  LinksLast := Result;  LinksLast^.Next := nil;  end;  with Result^ do  begin  Socket := INVALID_SOCKET;  RemoteIP := '';  RemotePort := 0;  TickCountActive := GetTickCount();  Worker := nil;  Data := nil;  end;  except  Result := nil;  WriteLog('GetLink: error');  end;  end;  procedure FreeLink(Link: PLink);  begin  try  with Link^ do  begin  Link^.Worker := nil;  if Link = LinksHead then  begin  LinksHead := Next;  if LinksLast = Link then  LinksLast := LinksHead  else  LinksHead^.Prior := nil;  end else  begin  Prior^.Next := Next;  if Next nil then  Next^.Prior := Prior;  if Link = LinksLast then  LinksLast := Prior;  end;  Next := nil;  LinkLast^.Next := Link;  LinkLast := Link;  LinkUse := LinkUse - 1;  LinkCount := LinkCount - 1;  end;  except  WriteLog('FreeLink: error');  end;  end;  procedure CloseLink(Link: PLink);  begin  EnterCriticalSection(LinkSec);  with Link^ do  begin  EnterCriticalSection(Section);  if Socket INVALID_SOCKET then  begin  try  CloseSocket(Socket);  except  WriteLog('CloseSocket: error');  end;  Socket := INVALID_SOCKET;  FreeLink(Link);  end;  LeaveCriticalSection(Link^.Section);  end;  LeaveCriticalSection(LinkSec);  end;  procedure CheckLinkLinkIdleOvertime(Data: Pointer);  var  TickCount: DWord;  Long: Integer;  Link: PLink;  begin  EnterCriticalSection(LinkSec);  try  TickCount := GetTickCount();  Link := LinksHead;  while Link nil do  with Link^ do  begin  EnterCriticalSection(Section);  if Socket INVALID_SOCKET then  begin  if TickCount > TickCountActive then  Long := TickCount - TickCountActive  else  Long := $FFFFFFFF - TickCountActive + TickCount;  if SOCK_IDLE_OVERTIME * 1000 0 do  i := i - 1;  if not PostSend(Link, IOMem, Len) then  FreeIOMem(IOMem);  end;  function OnWorkerThreadCreateDef(IsWorkerThread: Boolean): Pointer;  begin  Result := nil;  end;  var  WorkerHead: PWorker = nil;  WorkerCount: Integer = 0;  OnDisconnectEvt: TOnDisconnectEvt = OnDisconnectDef;  OnReceiveEvt: TOnReceiveEvt = OnReceiveDef;  OnWorkerThreadCreateEvt: TOnThreadCreateEvt = OnWorkerThreadCreateDef;  function GetWorkerCount(): Integer;  begin  Result := WorkerCount;  end;  function WorkerThread(Worker: PWorker): DWORD; stdcall;  var  Link: PLink;  IOInfo: PIOInfo;  Bytes: DWord;  CompletionPort: THandle;  begin  Result := 0;  CompletionPort := Worker^.CompletionPort;  with Worker^ do  begin  TickCountActive := GetTickCount();  TickCountLong := 0;  ExecCount := 0;  end;  WriteLog(Format('Worker thread:%d begin', [Worker^.ID]));  CoInitialize(nil);  try  while True do  begin  try  with Worker^ do  TickCountLong := TickCountLong + GetTickCount() - TickCountActive;    if GetQueuedCompletionStatus(CompletionPort, Bytes, DWORD(Link), POverlapped(IOInfo), INFINITE) = False then  begin  if (Link nil) then  with Link^ do  begin  EnterCriticalSection(LinkSec);  EnterCriticalSection(Section);  if Link^.Socket INVALID_SOCKET then  begin  try  CloseSocket(Socket);  except  WriteLog(Format('CloseSocket1:%d error', [Worker^.ID]));  end;  Socket := INVALID_SOCKET;  Link^.Worker := Worker;  try  OnDisconnectEvt(Link);  except  WriteLog(Format('OnDisconnectEvt1:%d error', [Worker^.ID]));  end;  Link^.Worker := nil;  FreeLink(Link);  end;  LeaveCriticalSection(Section);  LeaveCriticalSection(LinkSec);  end;  if IOInfo nil then  FreeIOMem(IOInfo^.DataBuf.buf);  WriteLog(Format('GetQueuedCompletionStatus:%d error', [Worker^.ID]));  continue;  end;    with Worker^ do  begin  TickCountActive := GetTickCount();  ExecCount := ExecCount + 1;  end;  if (Bytes = 0) then  begin  if (Link nil) then  with Link^ do  begin  EnterCriticalSection(LinkSec);  EnterCriticalSection(Section);  if Link^.Socket INVALID_SOCKET then  begin  try  CloseSocket(Socket);  except  WriteLog(Format('CloseSocket2:%d error', [Worker^.ID]));  end;  Socket := INVALID_SOCKET;  Link^.Worker := Worker;  try  OnDisconnectEvt(Link);  except  WriteLog(Format('OnDisconnectEvt2:%d error', [Worker^.ID]));  end;  Link^.Worker := nil;  FreeLink(Link);  end;  LeaveCriticalSection(Section);  LeaveCriticalSection(LinkSec);  if IOInfo.Flag = IO_WRITE then  FreeIOMem(IOInfo^.DataBuf.buf)  else  FreeIOMem(IOInfo^.DataBuf.buf);  continue;  end else  begin  if IOInfo nil then  FreeIOMem(IOInfo^.DataBuf.buf);  break;  end;  end;    if IOInfo.Flag = IO_WRITE then  begin  FreeIOMem(IOInfo^.DataBuf.buf);  continue;  end;    {if IOInfo.Flag = IO_ACCEPT then  begin  ......  continue;  end;}  with Link^, IOInfo^.DataBuf do  begin  Link^.Worker := Worker;  try  OnReceiveEvt(Link, buf, Bytes);  except  WriteLog(Format('OnReceiveEvt:%d error', [Worker^.ID]));  end;  Link^.Worker := nil;  TickCountActive := GetTickCount();  if not PostRecv(Link, buf) then  begin  EnterCriticalSection(LinkSec);  EnterCriticalSection(Section);  if Socket INVALID_SOCKET then  begin  try  CloseSocket(Socket);  except  WriteLog(Format('CloseSocket3:%d error', [Worker^.ID]));  end;  Socket := INVALID_SOCKET;  Link^.Worker := Worker;  try  OnDisconnectEvt(Link);  except  WriteLog(Format('OnDisconnectEvt3:%d error', [Worker^.ID]));  end;  Link^.Worker := nil;  FreeLink(Link);  end;  LeaveCriticalSection(Section);  LeaveCriticalSection(LinkSec);  FreeIOMem(buf);  end;  end;  except  WriteLog(Format('Worker thread:%d error', [Worker^.ID]));  end;  end;  finally  CoUninitialize();  WriteLog(Format('Worker thread:%d end', [Worker^.ID]));  SetEvent(Worker^.Finished);  end;  end;  procedure CreateWorkerThread(CompletionPort: THandle);  var  Worker, Workers: PWorker;  i: Integer;  SystemInfo: TSystemInfo;  ThreadHandle: THandle;  begin  GetSystemInfo(SystemInfo);  Workers := nil;  WorkerCount := (SystemInfo.dwNumberOfProcessors * 2 + 2);  for i := 1 to WorkerCount do  begin  Worker := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TWorker));  if Workers = nil then  begin  Workers := Worker;  WorkerHead := Workers;  end else  begin  Workers^.Next := Worker;  Workers := Worker;  end;  Worker^.CompletionPort := CompletionPort;  Worker^.Data := OnWorkerThreadCreateEvt(False);  Worker^.Finished := CreateEvent(nil, True, False, nil);  ThreadHandle := CreateThread(nil, 0, @WorkerThread, Worker, 0, Worker^.ID);  if ThreadHandle 0 then  CloseHandle(ThreadHandle);  end;  Workers^.Next := nil;  end;  procedure DestroyWorkerThread();  var  Worker, Save: PWorker;  begin  WorkerCount := 0;  Worker := WorkerHead;  while Worker nil do  begin  PostQueuedCompletionStatus(Worker^.CompletionPort, 0, 0, nil);  Worker := Worker^.Next;  end;  Worker := WorkerHead;  while Worker nil do  begin  with Worker^ do  begin  WaitForSingleObject(Worker^.Finished, INFINITE);  CloseHandle(Worker^.Finished);  Save := Worker^.Next;  end;  HeapFree(GetProcessHeap(), HEAP_NO_SERIALIZE, Worker);  Worker := Save;  end;  end;  function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer;  var  Worker: PWorker;  Count: Integer;  begin  Worker := WorkerHead;  Count := 0;  Result := 0;  while Worker nil do  with Worker^ do  begin  Count := Count + 1;  if Count = Index then  begin  TickCount := TickCountLong;  TickCountLong := 0;  Result := Worker^.ExecCount;  break;  end;  Worker := Worker^.Next;  end;  end;  function GetWorkerID(Index: Integer): Integer;  var  Worker: PWorker;  Count: Integer;  begin  Worker := WorkerHead;  Count := 0;  while Worker nil do  begin  Count := Count + 1;  if Count = Index then  begin  Count := Worker^.ID;  break;  end;  Worker := Worker^.Next;  end;  Result := Count;  end;  {===============================================================================  服务线程  ================================================================================}  function OnServerThreadCreateDef(IsWorkerThread: Boolean): Pointer;  begin  Result := nil;  end;  var  ListenSocket: TSocket = INVALID_SOCKET;  SocketEvent: THandle = WSA_INVALID_EVENT;  CompletionPort: THandle = 0;  Terminated: Boolean = False;  ServerThreadID: DWORD = 0;  ServerExecCount: Integer = 0;  ServerExecLong: DWord = 0;  OnServerThreadCreateEvt: TOnThreadCreateEvt = OnServerThreadCreateDef;  ServerFinished: THandle;  function GetServerExecCount(): Integer;  begin  Result := ServerExecCount;  end;  function GetServerExecLong(): DWord;  begin  Result := ServerExecLong;  ServerExecLong := 0;  end;    function ServerThread(Param: Pointer): DWORD; stdcall;  var  AcceptSocket: TSocket;  Addr: TSockAddrIn;  Len: Integer;  Link: PLink;  IOMem: Pointer;  bNodelay: Boolean;  TickCount: DWord;  WR: DWord;  begin  Result := 0;  CoInitialize(nil);  WriteLog('Server thread begin');  TickCount := GetTickCount();  try  while not Terminated do  begin  try  ServerExecLong := ServerExecLong + (GetTickCount() - TickCount);  WR := WaitForSingleObject(SocketEvent, 10000);    ServerExecCount := ServerExecCount + 1;  TickCount := GetTickCount();    if (WAIT_TIMEOUT = WR) then  begin  CheckLinkLinkIdleOvertime(Param);  continue;  end else  if (WAIT_FAILED = WR) then  begin  continue;  end else  begin  Len := SizeOf(TSockAddrIn);  AcceptSocket := WSAAccept(ListenSocket, @Addr, @Len, nil, 0);  if (AcceptSocket = INVALID_SOCKET) then  continue;  if LinkCount >= SOCK_MAX_COUNT then  begin  try  CloseSocket(AcceptSocket);  except  WriteLog('Link count over');  end;  continue;  end;    bNodelay := True;  if SetSockOpt(AcceptSocket, IPPROTO_TCP, TCP_NODELAY,  PChar(@bNodelay), sizeof(bNodelay)) = SOCKET_ERROR then  begin  try  CloseSocket(AcceptSocket);  except  WriteLog('SetSockOpt: error');  end;  continue;  end;  EnterCriticalSection(LinkSec);  Link := GetLink();  with Link^ do  begin  EnterCriticalSection(Section);  RemoteIP := inet_ntoa(Addr.sin_addr);  RemotePort := Addr.sin_port;  TickCountActive := GetTickCount();  Socket := AcceptSocket;  IOMem := GetIOMem();  if (CreateIoCompletionPort(AcceptSocket, CompletionPort, DWORD(Link), 0) = 0) or  (not PostRecv(Link, IOMem)) then  begin  try  CloseSocket(Socket);  except  WriteLog('CreateIoCompletionPort or PostRecv: error');  end;  Socket := INVALID_SOCKET;  FreeLink(Link);  FreeIOMem(IOMem);  end;  LeaveCriticalSection(Section);  end;  LeaveCriticalSection(LinkSec);  end;  except  WriteLog('Server thread error');  end;  end;  finally  CoUninitialize();  WriteLog('Server thread end');  SetEvent(ServerFinished);  end;  end;  function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean;  var  NonBlock: Integer;  bNodelay: Boolean;  Addr: TSockAddrIn;  ThreadHandle: THANDLE;  begin  Result := ListenSocket = INVALID_SOCKET;  if not Result then  exit;  IniIOMem();  IniLink();    ListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);  Result := ListenSocket INVALID_SOCKET;  if not Result then  begin  DeleteLink();  DeleteIOMem();  exit;  end;  bNodelay := True;  NonBlock := 1;  Addr.sin_family := AF_INET;  Addr.sin_addr.s_addr := inet_addr(PChar(RemoteIP));  Addr.sin_port := htons(RemotePort);  Result := (SetSockOpt(ListenSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@bNodelay), sizeof(bNodelay)) SOCKET_ERROR) and  (ioctlsocket(ListenSocket, Integer(FIONBIO), NonBlock) SOCKET_ERROR) and  (Bind(ListenSocket, @Addr, SizeOf(TSockAddrIn)) SOCKET_ERROR) and  (Listen(ListenSocket, SOMAXCONN) SOCKET_ERROR);  if not Result then  begin  ListenSocket := INVALID_SOCKET;  DeleteLink();  DeleteIOMem();  exit;  end;  SocketEvent := CreateEvent(nil, FALSE, FALSE, nil);  Result := (SocketEvent WSA_INVALID_EVENT);  if (not Result) then  begin  CloseSocket(ListenSocket);  ListenSocket := INVALID_SOCKET;  DeleteLink();  DeleteIOMem();  exit;  end;  Result := (WSAEventSelect(ListenSocket, SocketEvent, FD_ACCEPT) SOCKET_ERROR);  if not Result then  begin  CloseSocket(ListenSocket);  ListenSocket := INVALID_SOCKET;  WSACloseEvent(SocketEvent);  SocketEvent := WSA_INVALID_EVENT;  DeleteLink();  DeleteIOMem();  exit;  end;  CompletionPort := CreateIoCompletionPort(INVALID_HANDLE_value, 0, 0, 0);  Result := CompletionPort 0;  if not Result then  begin  CloseSocket(ListenSocket);  ListenSocket := INVALID_SOCKET;  WSACloseEvent(SocketEvent);  SocketEvent := WSA_INVALID_EVENT;  DeleteLink();  DeleteIOMem();  exit;  end;  WriteLog('Server Start');  CreateWorkerThread(CompletionPort);  ServerFinished := CreateEvent(nil, True, False, nil);  Result := ServerFinished 0;  if not Result then  begin  CloseSocket(ListenSocket);  ListenSocket := INVALID_SOCKET;  WSACloseEvent(SocketEvent);  SocketEvent := WSA_INVALID_EVENT;  DeleteLink();  DeleteIOMem();  exit;  end;  Terminated := False;  ThreadHandle := CreateThread(nil, 0, @ServerThread, OnServerThreadCreateEvt(False), 0, ServerThreadID);  if (ThreadHandle = 0) then  begin  StopTcpServer();  exit;  end;  CloseHandle(ThreadHandle);  end;  function StopTcpServer(): Boolean;  begin  Result := ListenSocket INVALID_SOCKET;  if not Result then  exit;  WriteLog('Server Stop');  Terminated := True;  if ServerFinished 0 then  begin  WaitForSingleObject(ServerFinished, INFINITE);  CloseHandle(ServerFinished);  ServerFinished := 0;  end;  if SocketEvent 0 then  WSACloseEvent(SocketEvent);  SocketEvent := 0;  DestroyWorkerThread();  if ListenSocket INVALID_SOCKET then  CloseSocket(ListenSocket);  ListenSocket := INVALID_SOCKET;  if CompletionPort 0 then  CloseHandle(CompletionPort);  CompletionPort := 0;  ServerExecCount := 0;  ServerExecLong := 0;  DeleteLink();  DeleteIOMem();  end;  function GetLocalIP(IsIntnetIP: Boolean): String;  type  TaPInAddr = Array[0..10] of PInAddr;  PaPInAddr = ^TaPInAddr;  var  phe: PHostEnt;  pptr: PaPInAddr;  Buffer: Array[0..63] of Char;  I: Integer;  begin  Result := '0.0.0.0';  try  GetHostName(Buffer, SizeOf(Buffer));  phe := GetHostByName(buffer);  if phe = nil then  Exit;  pPtr := PaPInAddr(phe^.h_addr_list);  if IsIntnetIP then  begin  I := 0;  while pPtr^[I] nil do  begin  Result := inet_ntoa(pptr^[I]^);  Inc(I);  end;  end else  Result := inet_ntoa(pptr^[0]^);  except  end;  end;  procedure SetEventProc(OnReceive: TOnReceiveEvt;  OnDisconnect: TOnDisconnectEvt;  OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt;  OnServerThreadCreate: TOnThreadCreateEvt;  OnWorkerThreadCreate: TOnThreadCreateEvt);  begin  OnReceiveEvt := OnReceive;  OnDisconnectEvt := OnDisconnect;  OnLinkIdleOvertimeEvt := OnLinkIdleOvertime;  OnServerThreadCreateEvt := OnServerThreadCreate;  OnWorkerThreadCreateEvt := OnWorkerThreadCreate;  end;  function PostRecv(Link: PLink; IOMem: Pointer): Boolean;  var  Flags: DWord;  Bytes: DWord;  IOInfo: PIOInfo;  begin  Result := Link^.Socket INVALID_SOCKET;  if Result then  try  Flags := 0;  Bytes := 0;  IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo));  with IOInfo^ do  begin  ZeroMemory(IOInfo, sizeof(TIOInfo));  DataBuf.buf := IOMem;  DataBuf.len := IO_MEM_SIZE;  Socket := Link^.Socket;  Flag := IO_READ;  Result := (WSARecv(Socket, @DataBuf, 1, @Bytes, @Flags, @Overlapped, nil) SOCKET_ERROR) or  (WSAGetLastError() = ERROR_IO_PENDING);  end;  except  Result := False;  WriteLog('PostRecv: error');  end;  end;  function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean;  var  Bytes: DWord;  IOInfo: PIOInfo;  begin  Result := Link^.Socket INVALID_SOCKET;  if Result then  try  Bytes := 0;  IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo));  with IOInfo^ do  begin  ZeroMemory(IOInfo, sizeof(TIOInfo));  DataBuf.buf := IOMem;  DataBuf.len := Len;  Socket := Link^.Socket;  Flag := IO_WRITE;  Result := (WSASend(Socket, @(DataBuf), 1, @Bytes, 0, @(Overlapped), nil) SOCKET_ERROR) or  (WSAGetLastError() = ERROR_IO_PENDING);  end;  except  Result := False;  WriteLog('PostSend: error');  end;  end;  procedure PostBroadcast(Buf: PByte; Len: Integer);  var  IOMem: Pointer;  Link: PLink;  begin  EnterCriticalSection(LinkSec);  Link := LinksHead;  while Link nil do  with Link^ do  begin  if Socket INVALID_SOCKET then  begin  IOMem := GetIOMem();  CopyMemory(IOMem, Buf, Len);  if not PostSend(Link, IOMem, Len) then  FreeIOMem(IOMem);  end;  Link := Link^.Next;  end;  LeaveCriticalSection(LinkSec);  end;  function IsTcpServerActive(): Boolean;  begin  Result := ListenSocket INVALID_SOCKET;  end;  {===============================================================================  日志管理  ================================================================================}  var  LogSec: TRTLCriticalSection;  Inifile: TIniFile;  LogCount: Integer = 0;  LogName: String = '';  procedure WriteLog(Log: String);  begin  EnterCriticalSection(LogSec);  try  LogCount := LogCount + 1;  IniFile.WriteString(LogName,  'Index' + IntToStr(LogCount),  DateTimeToStr(Now()) + ':' + Log);  finally  LeaveCriticalSection(LogSec);  end;  end;  {===============================================================================  初始化Window Socket  ================================================================================}  var  WSAData: TWSAData;    procedure Startup;  var  ErrorCode: Integer;  begin  ErrorCode := WSAStartup( {$SK_blogItemTitle$}  {$SK_ItemBody$}    {$SK_blogDiary$} {$SK_blogItemLink$} {$SK_blogItemComm$} {$SK_blogItemQuote$} {$SK_blogItemVisit$}    01, WSAData);  if ErrorCode 0 then  WriteLog('Window Socket init Error!');  end;  procedure Cleanup;  var  ErrorCode: Integer;  begin  ErrorCode := WSACleanup;  if ErrorCode 0 then  WriteLog('Window Socket cleanup error!');  end;  function GetExePath(): String;  var  ModuleName: array[0..1024] of char;  begin  GetModuleFileName(MainInstance, ModuleName, SizeOf(ModuleName));  Result := ExtractFilePath(ModuleName);  end;  initialization  LogName := DateTimeToStr(Now());  InitializeCriticalSection(LogSec);  ExePath := GetExePath();  IniFile := TIniFile.Create(ExePath + 'Logs.Ini');  Startup();  finalization  Cleanup();  DeleteCriticalSection(LogSec);  IniFile.Destroy();    end.    主窗口单元源码:  unit uMainTcpServerIOCP;  interface  uses  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,  Dialogs, ExtCtrls, StdCtrls, ComCtrls, UTcpServer, Sockets, Grids;  type  TfrmMainUTcpServerIOCP = class(TForm)  Label1: TLabel;  Label2: TLabel;  edtIP: TEdit;  edtPort: TEdit;  btn: TButton;  Timer1: TTimer;  Label3: TLabel;  lbIO: TLabel;  Label5: TLabel;  lbIOU: TLabel;  Label7: TLabel;  lbL: TLabel;  Label9: TLabel;  lbLU: TLabel;  Label11: TLabel;  lbLS: TLabel;  Label13: TLabel;  lbW: TLabel;  Info: TStringGrid;  Label4: TLabel;  lbWC: TLabel;  Label8: TLabel;  lbWU: TLabel;  Label12: TLabel;  lbLF: TLabel;  Label15: TLabel;  lbLFL: TLabel;  Label6: TLabel;  lbIOF: TLabel;  lbIOFL: TLabel;  Label16: TLabel;  Timer2: TTimer;  procedure btnClick(Sender: TObject);  procedure FormCreate(Sender: TObject);  procedure Timer1Timer(Sender: TObject);  procedure FormDestroy(Sender: TObject);  procedure Timer2Timer(Sender: TObject);  private  { Private declarations }  FTickCount: DWord;  public  { Public declarations }  end;  var  frmMainUTcpServerIOCP: TfrmMainUTcpServerIOCP;  implementation  {$R *.dfm}  { TfrmMainUTcpServerIOCP }  procedure TfrmMainUTcpServerIOCP.btnClick(Sender: TObject);  var  i: Integer;  C1: Integer;  C2: DWord;  DT: TDateTime;  begin  if btn.Caption = 'Open' then  begin  StartTcpServer(edtIP.Text, StrToInt(edtPort.Text));  if IsTcpServerActive() then  begin  FTickCount := GetTickCount();  Info.RowCount := GetWorkerCount() + 1;  DT := Now();  for i := 1 to Info.RowCount - 1 do  begin  Info.Cells[0, i] := IntToStr(i);  Info.Cells[1, i] := IntToStr(GetWorkerID(i));  C1 := GetWorkerExecInfo(i, C2);  Info.Cells[2, i] := IntToStr(C1);  Info.Cells[3, i] := '0';  Info.Cells[4, i] := IntToStr(C2);  Info.Cells[5, i] := '0';  Info.Cells[6, i] := DateTimeToStr(DT);  end;  Timer1.Enabled := True;  end;  end else  begin  Timer1.Enabled := False;  StopTcpServer();  end;  if IsTcpServerActive() then  btn.Caption := 'Close'  else  btn.Caption := 'Open';  end;  procedure TfrmMainUTcpServerIOCP.FormCreate(Sender: TObject);  begin  edtIP.Text := GetLocalIP(False);  Info.ColCount := 7;  Info.RowCount := 2;  Info.ColWidths[0] := 30;  Info.ColWidths[1] := 30;  Info.ColWidths[2] := 40;  Info.ColWidths[3] := 40;  Info.ColWidths[4] := 30;  Info.ColWidths[5] := 40;  Info.ColWidths[6] := 110;  Info.Cells[0, 0] := '序号';  Info.Cells[1, 0] := 'ID';  Info.Cells[2, 0] := '计数';  Info.Cells[3, 0] := '次/S';  Info.Cells[4, 0] := '时长';  Info.Cells[5, 0] := '使用率';  Info.Cells[6, 0] := '时间';  end;  procedure TfrmMainUTcpServerIOCP.Timer1Timer(Sender: TObject);  var  i: Integer;  Count1, Count2, Count3, TC, TCC: DWord;  begin  if not IsTcpServerActive() then  begin  Timer1.Enabled := False;  exit;  end;  TC := GetTickCount();  TCC := TC - FTickCount;  if TCC = 0 then  TCC := $FFFFFFFF;  lbWC.Caption := IntToStr(GetServerExecCount());  lbWU.Caption := FloatToStrF(GetServerExecLong() / TCC * 100, ffFixed, 10, 3) + '%';  for i := 1 to Info.RowCount - 1 do  begin  Count1 := GetWorkerExecInfo(i, Count2);  TC := GetTickCount();  TCC := TC - FTickCount;  if TCC = 0 then  TCC := $FFFFFFFF;    Count3 := StrToInt(Info.Cells[2, i]);  if Count1 Count3 then  begin  Info.Cells[2, i] := IntToStr(Count1);  Info.Cells[3, i] := IntToStr(Count1 - Count3);  Info.Cells[4, i] := IntToStr(Count2);  Info.Cells[5, i] := FloatToStrF(Count2 / TCC * 100, ffFixed, 10, 1) + '%';  Info.Cells[6, i] := DateTimeToStr(Now());  end;  end;  FTickCount := TC;  lbIO.Caption := IntToStr(GetIOMemSize());  lbIOU.Caption := FloatToStrF(GetIOMemUse(), ffFixed, 10, 3) + '%';  Count1 := GetIOMemFree();  lbIOF.Caption := IntToStr(Count1);  lbIOFL.Caption := FloatToStrF(Count1 / IO_MEM_MAX_COUNT * 100, ffFixed, 10, 3) + '%';  lbW.Caption := IntToStr(GetWorkerCount());  lbL.Caption := IntToStr(GetLinkSize());  Count1 := GetLinkFree();  lbLF.Caption := IntToStr(Count1);  lbLFL.Caption := FloatToStrF(Count1 / SOCK_MAX_COUNT * 100, ffFixed, 10, 3) + '%';  lbLU.Caption := FloatToStrF(GetLinkUse(), ffFixed, 10, 3) + '%';  lbLS.Caption := IntToStr(GetLinkCount());  end;  procedure TfrmMainUTcpServerIOCP.FormDestroy(Sender: TObject);  begin  StopTcpServer();  end;  procedure TfrmMainUTcpServerIOCP.Timer2Timer(Sender: TObject);  begin  if not IsTcpServerActive() then  begin  Timer1.Enabled := False;  exit;  end;  PostBroadcast(PByte(PChar('这是来自服务器的数据!')), 21);  end;  end.

 

转载于:https://my.oschina.net/u/3346994/blog/868856

你可能感兴趣的文章
javascript实现的一个信息提示的小功能/
查看>>
Centos7.x:开机启动服务的配置和管理
查看>>
HTML5 浏览器返回按钮/手机返回按钮事件监听
查看>>
xss
查看>>
iOS:百度长语音识别具体的封装:识别、播放、进度刷新
查看>>
JS获取服务器时间并且计算距离当前指定时间差的函数
查看>>
华为硬件工程师笔试题
查看>>
jquery居中窗口-页面加载直接居中
查看>>
cd及目录快速切换
查看>>
Unity Shaders and Effects Cookbook (3-5) 金属软高光
查看>>
31-hadoop-hbase-mapreduce操作hbase
查看>>
C++ 代码风格准则:POD
查看>>
linux-友好显示文件大小
查看>>
【转】【WPF】WPF中MeasureOverride ArrangeOverride 的理解
查看>>
【转】二叉树的非递归遍历
查看>>
NYOJ283对称排序
查看>>
接连遇到大牛
查看>>
[Cocos2d-x For WP8]矩形碰撞检测
查看>>
自己写spring boot starter
查看>>
花钱删不完负面消息
查看>>