很多人费尽心思,都没有找到一个完美的 I/O CP 例程,甚至跟人于误解,先将本人编写的例程公布出来,希望对那些苦苦寻觅的人带来收获。本例程可以作为初学者的学习之用,亦可以作为大型服务程序的通讯模块。其处理速度可以说,优化到了极点。如果理解了本例程的精髓,加上一个高效的通讯协议,你完全可以用它来构建一个高性能的通讯服务器。 在公布代码前,先谈谈I/O CP。对I/O CP的函数不多做说明了,网上很多,都一样。在此本人仅说一些技术上要注意的问题。 一、如何管理内存 1、IO数据缓冲管理 动态分配内存,是一种灵活的方式。但对于系统资源浪费是巨大的。因此本人采用的是预先分配服务器最大需要的内存,用链表来管理。任何时候分配交还都不需要遍历,仅需要互斥而已。 更巧妙的是,将IO发送信息和内存块有机的结合在一起,减少了链表的管理工作。 //IO操作标志 TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE); //IO操作信息 PIOInfo =^ TIOInfo; TIOInfo = packed record Overlapped: TOverlapped; //重叠结构 DataBuf: TWSABUF; //IO数据信息 Socket: TSocket; Flag: TIOFlag; TickCountSend: DWord; Next: PIOInfo; Prior: PIOInfo; end; PUNode =^ TUNode; TUNode = record Next: Pointer; end; PIOMem =^ TIOMem; TIOMem = packed record IOInfo: TIOInfo; Data: array[1..IO_MEM_SIZE] of Byte; //申请内存的时候,返回的是Data的地址 end; 2、链路数据管理 采用双向链表结构,减少删除节点时遍历消耗的时间 //每个连接的信息 PLink =^ TLink; TLink = record Socket: TSocket; RemoteIP: string[30]; RemotePort: DWord; //最后收到数据时的系统节拍 TickCountActive: DWord; //处理该连接的当前线程的信息 Worker: PWorker; Data: Pointer; //应用层可以设置这个成员,当OnReceive的时候,就不要每次遍历每个连接对应的数据区了 Section: TRTLCriticalSection; Next: PLink; Prior: PLink; end; 二、如何管理线程 每个工作线程创建的时候,调用:OnWorkerThreadCreateEvt,该函数可以返回这个线程对应的信息,比如为该线程创建的数据库连接控件或对应的类等,在OnReceive的可以从Link的Worker访问该成员Worker^.Data。 //工作线程信息 PWorker =^ TWorker; TWorker = record ID: THandle; CompletionPort: THandle; Data: Pointer; //调用OnWorkerThreadCreateEvt返回的值 //用于反应工作情况的数据 TickCountLong, TickCountActive: DWord; ExecCount: Integer; //线程完成后设置 Finished: THandle; Next: PWorker; end; 同理,服务线程也是具有一样的特点。相见源码。 关于线程同步,一直是众多程序头疼的问题。在本例程中,尽量避免了过多的互斥,并有效地防止了死锁现象。用RTLCriticalSection,稍微不注意,就会造成死锁的灾难。哪怕是两行代码的差别,对多线程而言都是灾难的。在本例程中,对数据同步需要操作的是在维护链路链表方面上。服务线程需要计算哪个连接空闲超时了,工作线程需要处理断线情况,应用层主动发送数据时需要对该链路独占,否则一个在发送,一个在处理断线故障,就会发送冲突,导致灾难后果。 在本人的压力测试中,已经有效的解决了这个问题,应用层部分不需要做什么同步工作,可以安心的收发数据了。同时每个线程都支持了数据库连接。 三、到底要创建多少个工作线程合适 很多文章说,有N个CPU就创建N个线程,也有说N*2+2。最不喜欢说话不负责任的人了,本例程可以让刚入门 I/O CP 的人对它有更深入的了解。 例程测试结果: 四、该不该使用类 有人说,抛弃一切类,对于服务器而言,会为类付出很多代价,从我的观点看,为类付出代价的,主要是动态创建的原因。其实,类成员访问和结构成员访问一样,需要相对地址。如果都是预先创建的,两者没有多大的差别。本例程采用裸奔函数的方式,当然在应用层可以采用类来管理,很难想象,如果没有没有类,需要多做多少工作。 五、缺点 不能发大数据包,只能发不超过固定数的数据包。但对于小数据报而言,它将是优秀的。 时间原因,不能做太多的解释和对代码做太多的注释,需要例程源码的可以和本人联系,免费提供。QQ:48092788 例程源码: http://d.download.csdn.net/down/1546336/guestcode 完成端口通讯服务模块源码: {****************************************************************************** * UCode 系列组件、控件 * * 作者:卢益贵 2003~2009 * * 版权所有 任何未经授权的使用和销售,均保留追究法律责任的权力 * * * * UCode 系列由XCtrls-YCtrls-ICtrls-NCode系列演变而来 * * QQ:48092788 luyigui.blog.gxsky.com * ******************************************************************************} {****************************************************************************** 完成端口模型的socket服务器 ******************************************************************************} unit UTcpServer; interface uses Windows, Classes, UClasses, UWinSock2; const //每个IO缓冲区的大小 IO_MEM_SIZE = 2048; //内存要足够用,可视情况设置 IO_MEM_MAX_COUNT = 1000 * 10; //最大连接数 SOCK_MAX_COUNT = 3000; //连接空闲实现,超过这个时间未收到客户端数据则关闭 SOCK_IDLE_OVERTIME = 60; type //工作线程信息 PWorker =^ TWorker; TWorker = record ID: THandle; CompletionPort: THandle; Data: Pointer; //用于反应工作情况的数据 TickCountLong, TickCountActive: DWord; ExecCount: Integer; //线程完成后设置 Finished: THandle; Next: PWorker; end; //每个连接的信息 PLink =^ TLink; TLink = record Socket: TSocket; RemoteIP: string[30]; RemotePort: DWord; //最后收到数据时的系统节拍 TickCountActive: DWord; //处理该连接的当前线程的信息 Worker: PWorker; Data: Pointer; Section: TRTLCriticalSection; Next: PLink; Prior: PLink; end; TOnLinkIdleOvertimeEvt = procedure(Link: PLink); TOnDisconnectEvt = procedure(Link: PLink); TOnReceiveEvt = function(Link: PLink; Buf: PByte; Len: Integer): Boolean; TOnThreadCreateEvt = function(IsWorkerThread: Boolean): Pointer; //取得链路链表使用情况X% function GetLinkUse(): real; //链路链表所占内存 function GetLinkSize(): Integer; //当前链路数 function GetLinkCount(): Integer; //空闲链路数 function GetLinkFree(): Integer; //IO内存使用情况 function GetIOMemUse(): Real; //IO内存链表占内存数 function GetIOMemSize(): Integer; //IO内存空闲数 function GetIOMemFree(): Integer; //交还一个IO内存 procedure FreeIOMem(Mem: Pointer); //获取一个IO内存区 function GetIOMem(): Pointer; //获取工作线程的工作情况 function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer; //获取工作线程的ID function GetWorkerID(Index: Integer): Integer; //获取工作线程数量 function GetWorkerCount(): Integer; //打开一个IP端口,并监听 function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean; //停止并关闭一个IP端口 function StopTcpServer(): Boolean; //设置响应事件的函数指针,在StartTcpServer之前调用 procedure SetEventProc(OnReceive: TOnReceiveEvt; OnDisconnect: TOnDisconnectEvt; OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt; OnServerThreadCreate: TOnThreadCreateEvt; OnWorkerThreadCreate: TOnThreadCreateEvt); //写日志文件 procedure WriteLog(Log: String); function PostRecv(Link: PLink; IOMem: Pointer): Boolean; //抛出一个发送事件 function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean; //广播数据到所有的链路对方 procedure PostBroadcast(Buf: PByte; Len: Integer); //当前是否打开 function IsTcpServerActive(): Boolean; //获取服务线程最后一次工作所占的时间(MS) function GetServerExecLong(): DWord; //获取服务线程工作次数 function GetServerExecCount(): Integer; //获取本地或对外IP地址 function GetLocalIP(IsIntnetIP: Boolean): String; implementation uses IniFiles, SysUtils, ActiveX; var ExePath: String = ''; const HEAP_NO_SERIALIZE = 1; {非互斥, 此标记可允许多个线程同时访问此堆} HEAP_GENERATE_EXCEPTIONS = 4; {当建立堆出错时, 此标记可激发一个异常并返回异常标识} HEAP_ZERO_MEMORY = 8; {把分配的内存初始化为 0} HEAP_REALLOC_IN_PLACE_ONLY = 16; {此标记不允许改变原来的内存位置} STATUS_ACCESS_VIOLATION = DWORD($C0000005); {参数错误} STATUS_NO_MEMORY = DWORD($C0000017); {内存不足} {=============================================================================== IO内存管理 ================================================================================} type //IO操作标志 TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE); //IO操作信息 PIOInfo =^ TIOInfo; TIOInfo = packed record Overlapped: TOverlapped; //重叠结构 DataBuf: TWSABUF; //IO数据信息 Socket: TSocket; Flag: TIOFlag; TickCountSend: DWord; Next: PIOInfo; Prior: PIOInfo; end; PUNode =^ TUNode; TUNode = record Next: Pointer; end; PIOMem =^ TIOMem; TIOMem = packed record IOInfo: TIOInfo; Data: array[1..IO_MEM_SIZE] of Byte; end; var IOMemHead: PIOMem = nil; IOMemLast: PIOMem = nil; IOMemUse: Integer = 0; IOMemSec: TRTLCriticalSection; IOMemList: array[1..IO_MEM_MAX_COUNT] of Pointer; function GetIOMem(): Pointer; begin //内存要足够用,如果不够,即使是动态分配,神仙也救不了 EnterCriticalSection(IOMemSec); try try Result := @(IOMemHead^.Data); IOMemHead := PUNode(IOMemHead)^.Next; IOMemUse := IOMemUse + 1; except Result := nil; WriteLog('GetIOMem: error'); end; finally LeaveCriticalSection(IOMemSec); end; end; procedure FreeIOMem(Mem: Pointer); begin EnterCriticalSection(IOMemSec); try try Mem := Pointer(Integer(Mem) - sizeof(TIOInfo)); PUNode(Mem).Next := nil; PUNode(IOMemLast)^.Next := Mem; IOMemLast := Mem; IOMemUse := IOMemUse - 1; except WriteLog('FreeIOMem: error'); end; finally LeaveCriticalSection(IOMemSec); end; end; procedure IniIOMem(); var i: Integer; Heap: THandle; begin InitializeCriticalSection(IOMemSec); IOMemHead := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TIOMem)); IOMemLast := IOMemHead; IOMemList[1] := IOMemHead; Heap := GetProcessHeap(); for i := 2 to IO_MEM_MAX_COUNT do begin PUNode(IOMemLast)^.Next := HeapAlloc(Heap, HEAP_ZERO_MEMORY, sizeof(TIOMem)); IOMemList[i] := PUNode(IOMemLast)^.Next; IOMemLast := PUNode(IOMemLast)^.Next; end; PUNode(IOMemLast).Next := nil; end; function GetIOMemFree(): Integer; var IOMems: PUNode; begin EnterCriticalSection(IOMemSec); Result := 0; IOMems := PUNode(IOMemHead); while IOMems nil do begin Result := Result + 1; IOMems := IOMems^.Next; end; LeaveCriticalSection(IOMemSec); end; procedure DeleteIOMem(); var i: Integer; Heap: THandle; begin Heap := GetProcessHeap(); for i := 1 to IO_MEM_MAX_COUNT do HeapFree(Heap, HEAP_NO_SERIALIZE, IOMemList[i]); IOMemUse := 0; DeleteCriticalSection(IOMemSec); end; function GetIOMemSize(): Integer; begin Result := IO_MEM_MAX_COUNT * sizeof(TIOMem); end; function GetIOMemUse(): Real; begin Result := (IOMemUse * 100) / IO_MEM_MAX_COUNT; end; {=============================================================================== Socket链路管理 ================================================================================} procedure OnLinkIdleOvertimeDef(Link: PLink); begin end; var LinkHead: PLink = nil; LinkLast: PLink = nil; LinkUse: Integer = 0; LinkCount: Integer = 0; LinkSec: TRTLCriticalSection; LinkList: array[1..SOCK_MAX_COUNT] of PLink; OnLinkIdleOvertimeEvt: TOnLinkIdleOvertimeEvt = OnLinkIdleOvertimeDef; LinksHead: PLink = nil; LinksLast: PLink = nil; function GetLinkFree(): Integer; var Links: PLink; begin EnterCriticalSection(LinkSec); Result := 0; Links := LinkHead; while Links nil do begin Result := Result + 1; Links := Links^.Next; end; LeaveCriticalSection(LinkSec); end; function GetLink(): PLink; begin try //内存要足够用,如果不够,即使是动态分配,神仙也救不了 Result := LinkHead; LinkHead := LinkHead^.Next; LinkUse := LinkUse + 1; LinkCount := LinkCount + 1; if LinksHead = nil then begin LinksHead := Result; LinksHead^.Next := nil; LinksHead^.Prior := nil; LinksLast := LinksHead; end else begin Result^.Prior := LinksLast; LinksLast^.Next := Result; LinksLast := Result; LinksLast^.Next := nil; end; with Result^ do begin Socket := INVALID_SOCKET; RemoteIP := ''; RemotePort := 0; TickCountActive := GetTickCount(); Worker := nil; Data := nil; end; except Result := nil; WriteLog('GetLink: error'); end; end; procedure FreeLink(Link: PLink); begin try with Link^ do begin Link^.Worker := nil; if Link = LinksHead then begin LinksHead := Next; if LinksLast = Link then LinksLast := LinksHead else LinksHead^.Prior := nil; end else begin Prior^.Next := Next; if Next nil then Next^.Prior := Prior; if Link = LinksLast then LinksLast := Prior; end; Next := nil; LinkLast^.Next := Link; LinkLast := Link; LinkUse := LinkUse - 1; LinkCount := LinkCount - 1; end; except WriteLog('FreeLink: error'); end; end; procedure CloseLink(Link: PLink); begin EnterCriticalSection(LinkSec); with Link^ do begin EnterCriticalSection(Section); if Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog('CloseSocket: error'); end; Socket := INVALID_SOCKET; FreeLink(Link); end; LeaveCriticalSection(Link^.Section); end; LeaveCriticalSection(LinkSec); end; procedure CheckLinkLinkIdleOvertime(Data: Pointer); var TickCount: DWord; Long: Integer; Link: PLink; begin EnterCriticalSection(LinkSec); try TickCount := GetTickCount(); Link := LinksHead; while Link nil do with Link^ do begin EnterCriticalSection(Section); if Socket INVALID_SOCKET then begin if TickCount > TickCountActive then Long := TickCount - TickCountActive else Long := $FFFFFFFF - TickCountActive + TickCount; if SOCK_IDLE_OVERTIME * 1000 0 do i := i - 1; if not PostSend(Link, IOMem, Len) then FreeIOMem(IOMem); end; function OnWorkerThreadCreateDef(IsWorkerThread: Boolean): Pointer; begin Result := nil; end; var WorkerHead: PWorker = nil; WorkerCount: Integer = 0; OnDisconnectEvt: TOnDisconnectEvt = OnDisconnectDef; OnReceiveEvt: TOnReceiveEvt = OnReceiveDef; OnWorkerThreadCreateEvt: TOnThreadCreateEvt = OnWorkerThreadCreateDef; function GetWorkerCount(): Integer; begin Result := WorkerCount; end; function WorkerThread(Worker: PWorker): DWORD; stdcall; var Link: PLink; IOInfo: PIOInfo; Bytes: DWord; CompletionPort: THandle; begin Result := 0; CompletionPort := Worker^.CompletionPort; with Worker^ do begin TickCountActive := GetTickCount(); TickCountLong := 0; ExecCount := 0; end; WriteLog(Format('Worker thread:%d begin', [Worker^.ID])); CoInitialize(nil); try while True do begin try with Worker^ do TickCountLong := TickCountLong + GetTickCount() - TickCountActive; if GetQueuedCompletionStatus(CompletionPort, Bytes, DWORD(Link), POverlapped(IOInfo), INFINITE) = False then begin if (Link nil) then with Link^ do begin EnterCriticalSection(LinkSec); EnterCriticalSection(Section); if Link^.Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog(Format('CloseSocket1:%d error', [Worker^.ID])); end; Socket := INVALID_SOCKET; Link^.Worker := Worker; try OnDisconnectEvt(Link); except WriteLog(Format('OnDisconnectEvt1:%d error', [Worker^.ID])); end; Link^.Worker := nil; FreeLink(Link); end; LeaveCriticalSection(Section); LeaveCriticalSection(LinkSec); end; if IOInfo nil then FreeIOMem(IOInfo^.DataBuf.buf); WriteLog(Format('GetQueuedCompletionStatus:%d error', [Worker^.ID])); continue; end; with Worker^ do begin TickCountActive := GetTickCount(); ExecCount := ExecCount + 1; end; if (Bytes = 0) then begin if (Link nil) then with Link^ do begin EnterCriticalSection(LinkSec); EnterCriticalSection(Section); if Link^.Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog(Format('CloseSocket2:%d error', [Worker^.ID])); end; Socket := INVALID_SOCKET; Link^.Worker := Worker; try OnDisconnectEvt(Link); except WriteLog(Format('OnDisconnectEvt2:%d error', [Worker^.ID])); end; Link^.Worker := nil; FreeLink(Link); end; LeaveCriticalSection(Section); LeaveCriticalSection(LinkSec); if IOInfo.Flag = IO_WRITE then FreeIOMem(IOInfo^.DataBuf.buf) else FreeIOMem(IOInfo^.DataBuf.buf); continue; end else begin if IOInfo nil then FreeIOMem(IOInfo^.DataBuf.buf); break; end; end; if IOInfo.Flag = IO_WRITE then begin FreeIOMem(IOInfo^.DataBuf.buf); continue; end; {if IOInfo.Flag = IO_ACCEPT then begin ...... continue; end;} with Link^, IOInfo^.DataBuf do begin Link^.Worker := Worker; try OnReceiveEvt(Link, buf, Bytes); except WriteLog(Format('OnReceiveEvt:%d error', [Worker^.ID])); end; Link^.Worker := nil; TickCountActive := GetTickCount(); if not PostRecv(Link, buf) then begin EnterCriticalSection(LinkSec); EnterCriticalSection(Section); if Socket INVALID_SOCKET then begin try CloseSocket(Socket); except WriteLog(Format('CloseSocket3:%d error', [Worker^.ID])); end; Socket := INVALID_SOCKET; Link^.Worker := Worker; try OnDisconnectEvt(Link); except WriteLog(Format('OnDisconnectEvt3:%d error', [Worker^.ID])); end; Link^.Worker := nil; FreeLink(Link); end; LeaveCriticalSection(Section); LeaveCriticalSection(LinkSec); FreeIOMem(buf); end; end; except WriteLog(Format('Worker thread:%d error', [Worker^.ID])); end; end; finally CoUninitialize(); WriteLog(Format('Worker thread:%d end', [Worker^.ID])); SetEvent(Worker^.Finished); end; end; procedure CreateWorkerThread(CompletionPort: THandle); var Worker, Workers: PWorker; i: Integer; SystemInfo: TSystemInfo; ThreadHandle: THandle; begin GetSystemInfo(SystemInfo); Workers := nil; WorkerCount := (SystemInfo.dwNumberOfProcessors * 2 + 2); for i := 1 to WorkerCount do begin Worker := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TWorker)); if Workers = nil then begin Workers := Worker; WorkerHead := Workers; end else begin Workers^.Next := Worker; Workers := Worker; end; Worker^.CompletionPort := CompletionPort; Worker^.Data := OnWorkerThreadCreateEvt(False); Worker^.Finished := CreateEvent(nil, True, False, nil); ThreadHandle := CreateThread(nil, 0, @WorkerThread, Worker, 0, Worker^.ID); if ThreadHandle 0 then CloseHandle(ThreadHandle); end; Workers^.Next := nil; end; procedure DestroyWorkerThread(); var Worker, Save: PWorker; begin WorkerCount := 0; Worker := WorkerHead; while Worker nil do begin PostQueuedCompletionStatus(Worker^.CompletionPort, 0, 0, nil); Worker := Worker^.Next; end; Worker := WorkerHead; while Worker nil do begin with Worker^ do begin WaitForSingleObject(Worker^.Finished, INFINITE); CloseHandle(Worker^.Finished); Save := Worker^.Next; end; HeapFree(GetProcessHeap(), HEAP_NO_SERIALIZE, Worker); Worker := Save; end; end; function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer; var Worker: PWorker; Count: Integer; begin Worker := WorkerHead; Count := 0; Result := 0; while Worker nil do with Worker^ do begin Count := Count + 1; if Count = Index then begin TickCount := TickCountLong; TickCountLong := 0; Result := Worker^.ExecCount; break; end; Worker := Worker^.Next; end; end; function GetWorkerID(Index: Integer): Integer; var Worker: PWorker; Count: Integer; begin Worker := WorkerHead; Count := 0; while Worker nil do begin Count := Count + 1; if Count = Index then begin Count := Worker^.ID; break; end; Worker := Worker^.Next; end; Result := Count; end; {=============================================================================== 服务线程 ================================================================================} function OnServerThreadCreateDef(IsWorkerThread: Boolean): Pointer; begin Result := nil; end; var ListenSocket: TSocket = INVALID_SOCKET; SocketEvent: THandle = WSA_INVALID_EVENT; CompletionPort: THandle = 0; Terminated: Boolean = False; ServerThreadID: DWORD = 0; ServerExecCount: Integer = 0; ServerExecLong: DWord = 0; OnServerThreadCreateEvt: TOnThreadCreateEvt = OnServerThreadCreateDef; ServerFinished: THandle; function GetServerExecCount(): Integer; begin Result := ServerExecCount; end; function GetServerExecLong(): DWord; begin Result := ServerExecLong; ServerExecLong := 0; end; function ServerThread(Param: Pointer): DWORD; stdcall; var AcceptSocket: TSocket; Addr: TSockAddrIn; Len: Integer; Link: PLink; IOMem: Pointer; bNodelay: Boolean; TickCount: DWord; WR: DWord; begin Result := 0; CoInitialize(nil); WriteLog('Server thread begin'); TickCount := GetTickCount(); try while not Terminated do begin try ServerExecLong := ServerExecLong + (GetTickCount() - TickCount); WR := WaitForSingleObject(SocketEvent, 10000); ServerExecCount := ServerExecCount + 1; TickCount := GetTickCount(); if (WAIT_TIMEOUT = WR) then begin CheckLinkLinkIdleOvertime(Param); continue; end else if (WAIT_FAILED = WR) then begin continue; end else begin Len := SizeOf(TSockAddrIn); AcceptSocket := WSAAccept(ListenSocket, @Addr, @Len, nil, 0); if (AcceptSocket = INVALID_SOCKET) then continue; if LinkCount >= SOCK_MAX_COUNT then begin try CloseSocket(AcceptSocket); except WriteLog('Link count over'); end; continue; end; bNodelay := True; if SetSockOpt(AcceptSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@bNodelay), sizeof(bNodelay)) = SOCKET_ERROR then begin try CloseSocket(AcceptSocket); except WriteLog('SetSockOpt: error'); end; continue; end; EnterCriticalSection(LinkSec); Link := GetLink(); with Link^ do begin EnterCriticalSection(Section); RemoteIP := inet_ntoa(Addr.sin_addr); RemotePort := Addr.sin_port; TickCountActive := GetTickCount(); Socket := AcceptSocket; IOMem := GetIOMem(); if (CreateIoCompletionPort(AcceptSocket, CompletionPort, DWORD(Link), 0) = 0) or (not PostRecv(Link, IOMem)) then begin try CloseSocket(Socket); except WriteLog('CreateIoCompletionPort or PostRecv: error'); end; Socket := INVALID_SOCKET; FreeLink(Link); FreeIOMem(IOMem); end; LeaveCriticalSection(Section); end; LeaveCriticalSection(LinkSec); end; except WriteLog('Server thread error'); end; end; finally CoUninitialize(); WriteLog('Server thread end'); SetEvent(ServerFinished); end; end; function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean; var NonBlock: Integer; bNodelay: Boolean; Addr: TSockAddrIn; ThreadHandle: THANDLE; begin Result := ListenSocket = INVALID_SOCKET; if not Result then exit; IniIOMem(); IniLink(); ListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); Result := ListenSocket INVALID_SOCKET; if not Result then begin DeleteLink(); DeleteIOMem(); exit; end; bNodelay := True; NonBlock := 1; Addr.sin_family := AF_INET; Addr.sin_addr.s_addr := inet_addr(PChar(RemoteIP)); Addr.sin_port := htons(RemotePort); Result := (SetSockOpt(ListenSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@bNodelay), sizeof(bNodelay)) SOCKET_ERROR) and (ioctlsocket(ListenSocket, Integer(FIONBIO), NonBlock) SOCKET_ERROR) and (Bind(ListenSocket, @Addr, SizeOf(TSockAddrIn)) SOCKET_ERROR) and (Listen(ListenSocket, SOMAXCONN) SOCKET_ERROR); if not Result then begin ListenSocket := INVALID_SOCKET; DeleteLink(); DeleteIOMem(); exit; end; SocketEvent := CreateEvent(nil, FALSE, FALSE, nil); Result := (SocketEvent WSA_INVALID_EVENT); if (not Result) then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; DeleteLink(); DeleteIOMem(); exit; end; Result := (WSAEventSelect(ListenSocket, SocketEvent, FD_ACCEPT) SOCKET_ERROR); if not Result then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; WSACloseEvent(SocketEvent); SocketEvent := WSA_INVALID_EVENT; DeleteLink(); DeleteIOMem(); exit; end; CompletionPort := CreateIoCompletionPort(INVALID_HANDLE_value, 0, 0, 0); Result := CompletionPort 0; if not Result then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; WSACloseEvent(SocketEvent); SocketEvent := WSA_INVALID_EVENT; DeleteLink(); DeleteIOMem(); exit; end; WriteLog('Server Start'); CreateWorkerThread(CompletionPort); ServerFinished := CreateEvent(nil, True, False, nil); Result := ServerFinished 0; if not Result then begin CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; WSACloseEvent(SocketEvent); SocketEvent := WSA_INVALID_EVENT; DeleteLink(); DeleteIOMem(); exit; end; Terminated := False; ThreadHandle := CreateThread(nil, 0, @ServerThread, OnServerThreadCreateEvt(False), 0, ServerThreadID); if (ThreadHandle = 0) then begin StopTcpServer(); exit; end; CloseHandle(ThreadHandle); end; function StopTcpServer(): Boolean; begin Result := ListenSocket INVALID_SOCKET; if not Result then exit; WriteLog('Server Stop'); Terminated := True; if ServerFinished 0 then begin WaitForSingleObject(ServerFinished, INFINITE); CloseHandle(ServerFinished); ServerFinished := 0; end; if SocketEvent 0 then WSACloseEvent(SocketEvent); SocketEvent := 0; DestroyWorkerThread(); if ListenSocket INVALID_SOCKET then CloseSocket(ListenSocket); ListenSocket := INVALID_SOCKET; if CompletionPort 0 then CloseHandle(CompletionPort); CompletionPort := 0; ServerExecCount := 0; ServerExecLong := 0; DeleteLink(); DeleteIOMem(); end; function GetLocalIP(IsIntnetIP: Boolean): String; type TaPInAddr = Array[0..10] of PInAddr; PaPInAddr = ^TaPInAddr; var phe: PHostEnt; pptr: PaPInAddr; Buffer: Array[0..63] of Char; I: Integer; begin Result := '0.0.0.0'; try GetHostName(Buffer, SizeOf(Buffer)); phe := GetHostByName(buffer); if phe = nil then Exit; pPtr := PaPInAddr(phe^.h_addr_list); if IsIntnetIP then begin I := 0; while pPtr^[I] nil do begin Result := inet_ntoa(pptr^[I]^); Inc(I); end; end else Result := inet_ntoa(pptr^[0]^); except end; end; procedure SetEventProc(OnReceive: TOnReceiveEvt; OnDisconnect: TOnDisconnectEvt; OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt; OnServerThreadCreate: TOnThreadCreateEvt; OnWorkerThreadCreate: TOnThreadCreateEvt); begin OnReceiveEvt := OnReceive; OnDisconnectEvt := OnDisconnect; OnLinkIdleOvertimeEvt := OnLinkIdleOvertime; OnServerThreadCreateEvt := OnServerThreadCreate; OnWorkerThreadCreateEvt := OnWorkerThreadCreate; end; function PostRecv(Link: PLink; IOMem: Pointer): Boolean; var Flags: DWord; Bytes: DWord; IOInfo: PIOInfo; begin Result := Link^.Socket INVALID_SOCKET; if Result then try Flags := 0; Bytes := 0; IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo)); with IOInfo^ do begin ZeroMemory(IOInfo, sizeof(TIOInfo)); DataBuf.buf := IOMem; DataBuf.len := IO_MEM_SIZE; Socket := Link^.Socket; Flag := IO_READ; Result := (WSARecv(Socket, @DataBuf, 1, @Bytes, @Flags, @Overlapped, nil) SOCKET_ERROR) or (WSAGetLastError() = ERROR_IO_PENDING); end; except Result := False; WriteLog('PostRecv: error'); end; end; function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean; var Bytes: DWord; IOInfo: PIOInfo; begin Result := Link^.Socket INVALID_SOCKET; if Result then try Bytes := 0; IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo)); with IOInfo^ do begin ZeroMemory(IOInfo, sizeof(TIOInfo)); DataBuf.buf := IOMem; DataBuf.len := Len; Socket := Link^.Socket; Flag := IO_WRITE; Result := (WSASend(Socket, @(DataBuf), 1, @Bytes, 0, @(Overlapped), nil) SOCKET_ERROR) or (WSAGetLastError() = ERROR_IO_PENDING); end; except Result := False; WriteLog('PostSend: error'); end; end; procedure PostBroadcast(Buf: PByte; Len: Integer); var IOMem: Pointer; Link: PLink; begin EnterCriticalSection(LinkSec); Link := LinksHead; while Link nil do with Link^ do begin if Socket INVALID_SOCKET then begin IOMem := GetIOMem(); CopyMemory(IOMem, Buf, Len); if not PostSend(Link, IOMem, Len) then FreeIOMem(IOMem); end; Link := Link^.Next; end; LeaveCriticalSection(LinkSec); end; function IsTcpServerActive(): Boolean; begin Result := ListenSocket INVALID_SOCKET; end; {=============================================================================== 日志管理 ================================================================================} var LogSec: TRTLCriticalSection; Inifile: TIniFile; LogCount: Integer = 0; LogName: String = ''; procedure WriteLog(Log: String); begin EnterCriticalSection(LogSec); try LogCount := LogCount + 1; IniFile.WriteString(LogName, 'Index' + IntToStr(LogCount), DateTimeToStr(Now()) + ':' + Log); finally LeaveCriticalSection(LogSec); end; end; {=============================================================================== 初始化Window Socket ================================================================================} var WSAData: TWSAData; procedure Startup; var ErrorCode: Integer; begin ErrorCode := WSAStartup( {$SK_blogItemTitle$} {$SK_ItemBody$} {$SK_blogDiary$} {$SK_blogItemLink$} {$SK_blogItemComm$} {$SK_blogItemQuote$} {$SK_blogItemVisit$} 01, WSAData); if ErrorCode 0 then WriteLog('Window Socket init Error!'); end; procedure Cleanup; var ErrorCode: Integer; begin ErrorCode := WSACleanup; if ErrorCode 0 then WriteLog('Window Socket cleanup error!'); end; function GetExePath(): String; var ModuleName: array[0..1024] of char; begin GetModuleFileName(MainInstance, ModuleName, SizeOf(ModuleName)); Result := ExtractFilePath(ModuleName); end; initialization LogName := DateTimeToStr(Now()); InitializeCriticalSection(LogSec); ExePath := GetExePath(); IniFile := TIniFile.Create(ExePath + 'Logs.Ini'); Startup(); finalization Cleanup(); DeleteCriticalSection(LogSec); IniFile.Destroy(); end. 主窗口单元源码: unit uMainTcpServerIOCP; interface uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, ExtCtrls, StdCtrls, ComCtrls, UTcpServer, Sockets, Grids; type TfrmMainUTcpServerIOCP = class(TForm) Label1: TLabel; Label2: TLabel; edtIP: TEdit; edtPort: TEdit; btn: TButton; Timer1: TTimer; Label3: TLabel; lbIO: TLabel; Label5: TLabel; lbIOU: TLabel; Label7: TLabel; lbL: TLabel; Label9: TLabel; lbLU: TLabel; Label11: TLabel; lbLS: TLabel; Label13: TLabel; lbW: TLabel; Info: TStringGrid; Label4: TLabel; lbWC: TLabel; Label8: TLabel; lbWU: TLabel; Label12: TLabel; lbLF: TLabel; Label15: TLabel; lbLFL: TLabel; Label6: TLabel; lbIOF: TLabel; lbIOFL: TLabel; Label16: TLabel; Timer2: TTimer; procedure btnClick(Sender: TObject); procedure FormCreate(Sender: TObject); procedure Timer1Timer(Sender: TObject); procedure FormDestroy(Sender: TObject); procedure Timer2Timer(Sender: TObject); private { Private declarations } FTickCount: DWord; public { Public declarations } end; var frmMainUTcpServerIOCP: TfrmMainUTcpServerIOCP; implementation {$R *.dfm} { TfrmMainUTcpServerIOCP } procedure TfrmMainUTcpServerIOCP.btnClick(Sender: TObject); var i: Integer; C1: Integer; C2: DWord; DT: TDateTime; begin if btn.Caption = 'Open' then begin StartTcpServer(edtIP.Text, StrToInt(edtPort.Text)); if IsTcpServerActive() then begin FTickCount := GetTickCount(); Info.RowCount := GetWorkerCount() + 1; DT := Now(); for i := 1 to Info.RowCount - 1 do begin Info.Cells[0, i] := IntToStr(i); Info.Cells[1, i] := IntToStr(GetWorkerID(i)); C1 := GetWorkerExecInfo(i, C2); Info.Cells[2, i] := IntToStr(C1); Info.Cells[3, i] := '0'; Info.Cells[4, i] := IntToStr(C2); Info.Cells[5, i] := '0'; Info.Cells[6, i] := DateTimeToStr(DT); end; Timer1.Enabled := True; end; end else begin Timer1.Enabled := False; StopTcpServer(); end; if IsTcpServerActive() then btn.Caption := 'Close' else btn.Caption := 'Open'; end; procedure TfrmMainUTcpServerIOCP.FormCreate(Sender: TObject); begin edtIP.Text := GetLocalIP(False); Info.ColCount := 7; Info.RowCount := 2; Info.ColWidths[0] := 30; Info.ColWidths[1] := 30; Info.ColWidths[2] := 40; Info.ColWidths[3] := 40; Info.ColWidths[4] := 30; Info.ColWidths[5] := 40; Info.ColWidths[6] := 110; Info.Cells[0, 0] := '序号'; Info.Cells[1, 0] := 'ID'; Info.Cells[2, 0] := '计数'; Info.Cells[3, 0] := '次/S'; Info.Cells[4, 0] := '时长'; Info.Cells[5, 0] := '使用率'; Info.Cells[6, 0] := '时间'; end; procedure TfrmMainUTcpServerIOCP.Timer1Timer(Sender: TObject); var i: Integer; Count1, Count2, Count3, TC, TCC: DWord; begin if not IsTcpServerActive() then begin Timer1.Enabled := False; exit; end; TC := GetTickCount(); TCC := TC - FTickCount; if TCC = 0 then TCC := $FFFFFFFF; lbWC.Caption := IntToStr(GetServerExecCount()); lbWU.Caption := FloatToStrF(GetServerExecLong() / TCC * 100, ffFixed, 10, 3) + '%'; for i := 1 to Info.RowCount - 1 do begin Count1 := GetWorkerExecInfo(i, Count2); TC := GetTickCount(); TCC := TC - FTickCount; if TCC = 0 then TCC := $FFFFFFFF; Count3 := StrToInt(Info.Cells[2, i]); if Count1 Count3 then begin Info.Cells[2, i] := IntToStr(Count1); Info.Cells[3, i] := IntToStr(Count1 - Count3); Info.Cells[4, i] := IntToStr(Count2); Info.Cells[5, i] := FloatToStrF(Count2 / TCC * 100, ffFixed, 10, 1) + '%'; Info.Cells[6, i] := DateTimeToStr(Now()); end; end; FTickCount := TC; lbIO.Caption := IntToStr(GetIOMemSize()); lbIOU.Caption := FloatToStrF(GetIOMemUse(), ffFixed, 10, 3) + '%'; Count1 := GetIOMemFree(); lbIOF.Caption := IntToStr(Count1); lbIOFL.Caption := FloatToStrF(Count1 / IO_MEM_MAX_COUNT * 100, ffFixed, 10, 3) + '%'; lbW.Caption := IntToStr(GetWorkerCount()); lbL.Caption := IntToStr(GetLinkSize()); Count1 := GetLinkFree(); lbLF.Caption := IntToStr(Count1); lbLFL.Caption := FloatToStrF(Count1 / SOCK_MAX_COUNT * 100, ffFixed, 10, 3) + '%'; lbLU.Caption := FloatToStrF(GetLinkUse(), ffFixed, 10, 3) + '%'; lbLS.Caption := IntToStr(GetLinkCount()); end; procedure TfrmMainUTcpServerIOCP.FormDestroy(Sender: TObject); begin StopTcpServer(); end; procedure TfrmMainUTcpServerIOCP.Timer2Timer(Sender: TObject); begin if not IsTcpServerActive() then begin Timer1.Enabled := False; exit; end; PostBroadcast(PByte(PChar('这是来自服务器的数据!')), 21); end; end.