VC驿站

 找回密码
 加入驿站

QQ登录

只需一步,快速开始

有编程疑问吗?还请到提问专区发帖提问!
搜索
查看: 698|回复: 0

WinSock异步IO模型-[5]完成端口 - Completion Port

[复制链接]
51_avatar_middle
online_admins Syc 发表于 2018-1-6 20:24:59 | 显示全部楼层 |阅读模式
█ “完成端口”模型是迄今为止最复杂的一种 I/O 模型。但是,若一个应用程序同时需要管理很多的套接字,
那么采用这种模型,往往可以达到最佳的系统性能!但缺点是,该模型只适用于 Windows NT 和 Windows 2000 以上版本的操作系统。

█ 因其设计的复杂性,只有在你的应用程序需要同时管理数百乃至上千个套接字的时候,而且希望随着系统内安装的CPU数量的增多,
应用程序的性能也可以线性提升,才应考虑采用“完成端口”模型。

█ 从本质上说,完成端口模型要求我们创建一个 Win32 完成端口对象,通过指定数量的线程,
对重叠 I/O 请求进行管理,以便为已经完成的重叠 I/O 请求提供服务。

█ ※※※ 大家可以这样理解,一个完成端口其实就是一个完成 I/O 的通知队列,由操作系统把已经完成的重叠 I/O 请求的通知放入这个队列中。
当某项 I/O 操作一旦完成,某个可以对该操作结果进行处理的工作者线程就会收到一则通知,工作者线程再去做一些其他的善后工作,
比如:将收到的数据进行显示,等等。而套接字在被创建后,可以在任何时候与某个完成端口进行关联。※※※

通常情况下,我们会在应用程序中创建一定数量的工作者线程来处理这些通知。线程数量取决于应用程序的特定需要。理想的情况是,线程数量等于处理器的数量,不过这也要求任何线程都不应该执行诸如同步读写、等待事件通知等阻塞型的操作,以免线程阻塞。每个线程都将分到一定的CPU时间,在此期间该线程可以运行,然后另一个线程将分到一个时间片并开始执行。如果某个线程执行了阻塞型的操作,操作系统将剥夺其未使用的剩余时间片并让其它线程开始执行。也就是说,前一个线程没有充分使用其时间片,当发生这样的情况时,应用程序应该准备其它线程来充分利用这些时间片。


█ 使用这种模型之前,首先要创建一个 I/O 完成端口对象,用它面向任意数量的套接字句柄,管理多个 I/O 请求。
要做到这一点,需要调用 CreateCompletionPort 函数,其定义如下:
  1. HANDLE WINAPI CreateIoCompletionPort(
  2.   __in          HANDLE FileHandle,
  3.   __in          HANDLE ExistingCompletionPort,
  4.   __in          ULONG_PTR CompletionKey,
  5.   __in          DWORD NumberOfConcurrentThreads
  6. );
复制代码


要注意该函数有两个功能:
● 用于创建一个完成端口对象;
● 将一个句柄同完成端口对象关联到一起。

如果仅仅为了创建一个完成端口对象,唯一注意的参数便是 NumberOfConcurrentThreads(并发线程的数量),前面三个参数可忽略。

NumberOfConcurrentThreads 参数的特殊之处在于,它定义了在一个完成端口上,同时允许执行的线程数量。
理想情况下,我们希望每个处理器各自负责一个线程的运行,为完成端口提供服务,避免过于频繁的线程“场景”(即线程上下文)切换。
若将该参数设为 0,表明系统内安装了多少个处理器,便允许同时运行多少个工作者线程!可用下述代码创建一个 I/O 完成端口:
HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);


★1、工作者线程与完成端口
成功创建一个完成端口后,便可开始将套接字句柄与其关联到一起。但在关联套接字之前,首先必须创建一个或多个“工作者线程”,
以便在 I/O 请求投递给完成端口后,为完成端口提供服务。在这个时候,大家或许会觉得奇怪,到底应创建多少个线程,以便为完成端口提供服务呢?

在此,要记住的一点,我们调用 CreateIoComletionPort 时指定的并发线程数量,与打算创建的工作者线程数量相比,它们代表的不是同一件事情。

CreateIoCompletionPort 函数的 NumberOfConcurrentThreads 参数明确指示系统:
在一个完成端口上,一次只允许 n 个工作者线程运行。假如在完成端口上创建的工作者线程数量超出 n 个,那么在同一时刻,最多只允许n个线程运行。
但实际上,在一段较短的时间内,系统有可能超过这个值,但很快便会把它减少至事先在 CreateIoCompletionPort 函数中设定的值。

那么,为何实际创建的工作者线程数量有时要比 CreateIoCompletionPort 函数设定的多一些呢?这样做有必要吗?

这主要取决于应用程序的总体设计情况。假定我们的某个工作者线程调用了一个函数,比如 Sleep 或 WaitForSingleObject,
进入了暂停(锁定或挂起)状态,那么允许另一个线程代替它的位置。换言之,我们希望随时都能执行尽可能多的线程;
当然,最大的线程数量是事先在 CreateIoCompletonPort 调用里设定好的。这样一来,假如事先预计到自己的线程有可能暂时处于停顿状态,
那么最好能够创建比 CreateIoCompletonPort 的 NumberOfConcurrentThreads 参数的值多的线程,以便到时候充分发挥系统的潜力。

      ==========================================

一旦在完成端口上拥有足够多的工作者线程来为 I/O 请求提供服务,便可着手将套接字句柄同完成端口关联到一起。
这要求我们在一个现有的完成端口上,调用 CreateIoCompletionPort 函数,同时为前三个参数— FileHandle,ExistingCompletionPort 和 CompletionKey—提供套接字的信息。

● FileHandle 参数指定一个要同完成端口关联在一起的套接字句柄;
● ExistingCompletionPort 参数指定的是一个现有的完成端口;
● CompletionKey(完成键)参数指定与某个套接字句柄关联在一起的“单句柄数据”,可将其作为指向一个数据结构的指针,
在此数据结构中,同时包含了套接字的句柄,以及与套接字有关的其他信息,如 IP 地址等。为完成端口提供服务的线程函数可通过这个参数,取得与套接字句柄有关的信息。

根据目前,首先来构建一个基本的应用程序框架。下面的程序清单向
大家阐述了如何使用完成端口模型,来开发一个服务器应用。在这个程序中,
我们按照以下步骤进行:

1) 创建一个完成端口,第四个参数保持为 0,指定在完成端口上,每个处理器一次只允许执行一个工作者线程;
2) 判断系统内到底安装了多少个处理器;
3) 创建工作者线程,根据步骤 2) 得到的处理器信息,在完成端口上,为已完成的 I/O 请求提供服务,在这个简单的例子中,我们为每个处理器都只创建一个工作者线程。
这是由于事先已预计到,到时不会有任何线程进入“挂起”状态,造成由于线程数量的不足,而使处理器空闲的局面(没有足够的线程可供执行)。
调用 CreateThread 函数时,必须同时提供一个工作者例程,由线程在创建好执行;
4) 准备好一个监听套接字,在端口 9527 上监听进入的连接请求;
5) 使用 accept 函数,接受进入的连接请求;
6) 创建一个数据结构,用于容纳“单句柄数据”,同时在结构中存入接受的套接字句柄;
7) 调用 CreateIoCompletionPort 函数,将从 accept 返回的新套接字句柄同完成端口关联到一起,
通过完成键(CompletionKey)参数,将单句柄数据结构传递给 CreateIoCompletionPort 函数;
8) 开始在已接受的连接上进行 I/O 操作,在此,我们希望通过重叠 I/O 机制,在新建的套接字上投递一个或多个异步 WSARecv 或 WSASend 请求。
这些 I/O 请求完成后,一个工作者线程会为 I/O 请求提供服务,同时继续处理未来的其他 I/O 请求,
稍后便会在步骤 3) 指定的工作者例程中,体验到这一点;
9)重复步骤 5) ~ 8),直至服务器中止。

代码如下:
  1. HANDLE CompletionPort;
  2. WSADATA wsd;
  3. SYSTEM_INFO SystemInfo;
  4. SOCKADDR_IN InternetAddr;
  5. SOCKET Listen;
  6. int i;

  7. typedef struct _PER_HANDLE_DATA
  8. {
  9.         SOCKET                Socket;
  10.         SOCKADDR_STORAGE  ClientAddr;
  11.         // Other information useful to be associated with the handle
  12. } PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

  13. // Load Winsock
  14. StartWinsock(MAKEWORD(2,2), &wsd);

  15. // Step 1:
  16. //  创建一个完成端口

  17. CompletionPort = CreateIoCompletionPort(
  18.     INVALID_HANDLE_VALUE, NULL, 0, 0);

  19. // Step 2:
  20. // 判断系统内到底安装了多少个处理器

  21. GetSystemInfo(&SystemInfo);

  22. // Step 3:
  23. // 根据处理器的数量创建工作者线程

  24. for(i = 0; i < SystemInfo.dwNumberOfProcessors; i++)
  25. {
  26.     HANDLE ThreadHandle;

  27.     // Create a server worker thread, and pass the
  28.     // completion port to the thread. NOTE: the
  29.     // ServerWorkerThread procedure is not defined
  30.     // in this listing.

  31.     ThreadHandle = CreateThread(NULL, 0,
  32.         ServerWorkerThread, CompletionPort,
  33.         0, NULL;

  34.     // Close the thread handle
  35.     CloseHandle(ThreadHandle);
  36. }

  37. // Step 4:
  38. // 准备好一个监听套接字

  39. Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,
  40.     WSA_FLAG_OVERLAPPED);

  41. InternetAddr.sin_family = AF_INET;
  42. InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
  43. InternetAddr.sin_port = htons(9527);
  44. bind(Listen, (PSOCKADDR) &InternetAddr,
  45.     sizeof(InternetAddr));

  46. // Prepare socket for listening

  47. listen(Listen, 5);

  48. while(TRUE)
  49. {
  50.     PER_HANDLE_DATA *PerHandleData=NULL;
  51.     SOCKADDR_IN saRemote;
  52.     SOCKET Accept;
  53.     int RemoteLen;
  54.     // Step 5:
  55.     // 接受客户端的连接

  56.     RemoteLen = sizeof(saRemote);
  57.     Accept = WSAAccept(Listen, (SOCKADDR *)&saRemote,
  58.     &RemoteLen);

  59.     // Step 6:
  60.     // 创建一个数据结构,用于容纳“单句柄数据”
  61.     PerHandleData = (LPPER_HANDLE_DATA)
  62.         GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));

  63.     printf("Socket number %d connected\n", Accept);
  64.     PerHandleData->Socket = Accept;
  65.     memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen);

  66.     // Step 7:
  67.     // 调用 CreateIoCompletionPort 函数,将从 accept 返回的新套接字句柄同完成端口关联到一起

  68.     CreateIoCompletionPort((HANDLE) Accept,
  69.         CompletionPort, (DWORD) PerHandleData, 0);

  70.     // Step 8:
  71.     //  开始在已接受的连接上进行 I/O 操作
  72.     WSARecv(...);
  73. }

  74. DWORD WINAPI ServerWorkerThread(LPVOID lpParam)
  75. {
  76.     // The requirements for the worker thread will be
  77.     // discussed later.
  78.     return 0;
  79. }
复制代码


★2、完成端口和重叠 I/O(工作者线程要做的事情)
将套接字句柄与一个完成端口关联在一起后,便可投递发送与接收请求,开始对 I/O 请求的处理。
接下来,可开始依赖完成端口,来接收有关 I/O 操作完成情况的通知。
从本质上说,完成端口模型利用了 Win32 重叠 I/O 机制。在这种机制中,象 WSASend 和 WSARecv 这样的 WinsockAPI 调用会立即返回。
此时,需要由我们的应用程序负责在以后的某个时间,通过一个 OVERLAPPED 结构,来接收之前调用请求的结果。

在完成端口模型中,要想做到这一点,需要使用 GetQueuedCompletionStatus(获取排队完成状态)函数,
让一个或多个工作者线程在完成端口上等待 I/O 请求完成的通知。该函数的定义如下:
  1. BOOL WINAPI GetQueuedCompletionStatus(
  2.   __in          HANDLE CompletionPort,
  3.   __out         LPDWORD lpNumberOfBytes,
  4.   __out         PULONG_PTR lpCompletionKey,
  5.   __out         LPOVERLAPPED* lpOverlapped,
  6.   __in          DWORD dwMilliseconds
  7. );
复制代码

● CompletionPort 参数对应于要在上面等待的完成端口;
● lpNumberOfBytes 参数负责在完成了一次 I/O 操作后(如:WSASend 或 WSARecv),接收实际传输的字节数。
● lpCompletionKey 参数为原先传递给CreateIoCompletionPort 函数第三个参数“单句柄数据”,如我们早先所述,大家最好将套接字句柄保存在这个“键”(Key)中。
● lpOverlapped 参数用于接收完成 I/O 操作的重叠结果。这实际是一个相当重要的参数,因为可用它获取每个 I/O 操作的数据。
● dwMilliseconds 参数用于指定希望等待一个完成数据包在完成端口上出现的时间,即,超时时间。假如将其设为 INFINITE,会一直等待下去。

★3、“单句柄数据”和 单 I/O 操作数据
一个工作者线程从 GetQueuedCompletionStatus 函数接收到 I/O 完成通知后,在 lpCompletionKey 和 lpOverlapped 参数中,
会包含一些重要的套接字信息。利用这些信息,可通过完成端口,继续在一个套接字上进行其他的处理。

通过这些参数,可获得两方面重要的套接字数据:“单句柄数据”以及单 I/O 操作数据。

其中,lpCompletionKey参数包含了“单句柄数据”,因为在一个套接字首次与完成端口关联到一起的时候,
那些数据便与一个特定的套接字句柄对应起来了。这些数据正是我们在调用 CreateIoCompletionPort 函数时候,通过 CompletionKey 参数传递的。
通常情况下,应用程序会将与 I/O 请求有关的套接字句柄及其他的一些相关信息保存在这里;

lpOverlapped 参数则包含了一个 OVERLAPPED 结构,在它后面跟随“单 I/O 操作数据”。
单 I/O 操作数据可以是追加到一个 OVERLAPPED 结构末尾的、任意数量的字节。
假如一个函数要求用到一个 OVERLAPPED 结构,我们便必须将这样的一个结构传递进去,以满足它的要求。
要想做到这一点,一个简单的方法是定义一个结构,然后将 OVERLAPPED 结构作为新结构的第一个元素使用。
举个例子来说,可定义下述数据结构,实现对单 I/O 操作数据的管理:
  1. typedef struct
  2. {
  3.         OVERLAPPED        Overlapped;
  4.         WSABUF                        DataBuf;
  5.         char                                        szBuffer[DATA_BUF_SIZE];
  6.         int                                        OperationType;
  7. } PER_IO_OPERATION_DATA;
复制代码


该结构演示了通常与 I/O 操作关联的一些重要的数据元素,比如刚才完成的那个 I/O 操作的类型(发送或接收请求),用 OperationType 字段表示,
同时,用于已完成 I/O 操作数据的缓冲区 szBuffer 也是非常有用的。如果想调用一个 Winsock API 函数(如:WSASend、WSARecv),要为其分配一个 OVERLAPPED 结构,
这时,就可以将我们的结构强制转换成一个 OVERLAPPED 指针,或者从结构中将 OVERLAPPED 元素的地址取出来。如下例所示:
PER_IO_OPERATION_DATA PerIoData;
……
//可以这样调用:
WSARecv(socket, ..., (OVERLAPPED *)&PerIoData);
//也可以这样调用:
WSARecv(socket, ..., &(PerIoData.Overlapped));

在工作线程的后面部分,等 GetQueuedCompletionStatus 函数返回了一个重叠结构(和完成键)后,
便可通过 OperationType 成员,看出到底是哪个操作投递到了这个句柄之上(只需将返回的重叠结强制转换为自己的 PER_IO_OPERATION_DATA 结构)。
对单 I/O 操作数据来说,它最大的一个优点便是允许我们在同一个句柄上,同时管理多个 I/O 操作(读/写,多个读,多个写,等等)。

  1. DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
  2. {
  3.     HANDLE CompletionPort = (HANDLE) CompletionPortID;
  4.     DWORD BytesTransferred;
  5.     LPOVERLAPPED Overlapped;
  6.     LPPER_HANDLE_DATA PerHandleData;
  7.     LPPER_IO_DATA PerIoData;
  8.     DWORD SendBytes, RecvBytes;
  9.     DWORD Flags;
  10.    
  11.     while(TRUE)
  12.     {
  13.         // Wait for I/O to complete on any socket
  14.         // associated with the completion port
  15.    
  16.         ret = GetQueuedCompletionStatus(CompletionPort,
  17.             &BytesTransferred,(LPDWORD)&PerHandleData,
  18.             (LPOVERLAPPED *) &PerIoData, INFINITE);

  19.         // First check to see if an error has occurred
  20.         // on the socket; if so, close the
  21.         // socket and clean up the per-handle data
  22.         // and per-I/O operation data associated with
  23.         // the socket

  24.         if (BytesTransferred == 0 &&
  25.             (PerIoData->OperationType == RECV_POSTED ││
  26.              PerIoData->OperationType == SEND_POSTED))
  27.         {
  28.             // A zero BytesTransferred indicates that the
  29.             // socket has been closed by the peer, so
  30.             // you should close the socket. Note:
  31.             // Per-handle data was used to reference the
  32.             // socket associated with the I/O operation.

  33.             closesocket(PerHandleData->Socket);

  34.             GlobalFree(PerHandleData);
  35.             GlobalFree(PerIoData);
  36.             continue;
  37.         }

  38.         // Service the completed I/O request. You can
  39.         // determine which I/O request has just
  40.         // completed by looking at the OperationType
  41.         // field contained in the per-I/O operation data.
  42.          if (PerIoData->OperationType == RECV_POSTED)
  43.         {
  44.             // Do something with the received data
  45.             // in PerIoData->Buffer
  46.         }

  47.         // Post another WSASend or WSARecv operation.
  48.         // As an example, we will post another WSARecv()
  49.         // I/O operation.

  50.         Flags = 0;

  51.         // Set up the per-I/O operation data for the next
  52.         // overlapped call
  53.         ZeroMemory(&(PerIoData->Overlapped),
  54.             sizeof(OVERLAPPED));

  55.         PerIoData->DataBuf.len = DATA_BUFSIZE;
  56.         PerIoData->DataBuf.buf = PerIoData->Buffer;
  57.         PerIoData->OperationType = RECV_POSTED;

  58.         WSARecv(PerHandleData->Socket,
  59.             &(PerIoData->DataBuf), 1, &RecvBytes,
  60.             &Flags, &(PerIoData->Overlapped), NULL);
  61.     }
  62. }
复制代码

★4、正确地关闭 I/O 完成端口
如何正确地关闭 I/O 完成端口,特别是同时运行了一个或多个线程,在几个不同的套接字上执行 I/O 操作的时候。
要避免的一个重要问题是在进行重叠 I/O 操作的同时,强行释放一个 OVERLAPPED 结构。
要想避免出现这种情况,最好的办法是针对每个套接字句柄,调用 closesocket 函数,任何尚未进行的重叠 I/O 操作都会完成。一旦所有套接字句柄都已关闭,
便需在完成端口上,终止所有工作者线程的运行。要想做到这一点,需要使用 PostQueuedCompletionStatus 函数,向每个工作者线程都发送一个特殊的完成数据包。
该函数会指示每个线程都“立即结束并退出”。下面是 PostQueuedCompletionStatus 函数的定义:
  1. BOOL WINAPI PostQueuedCompletionStatus(
  2.   __in          HANDLE CompletionPort,
  3.   __in          DWORD dwNumberOfBytesTransferred,
  4.   __in          ULONG_PTR dwCompletionKey,
  5.   __in          LPOVERLAPPED lpOverlapped
  6. );
复制代码

● CompletionPort 参数指定想向其发送一个完成数据包的完成端口对象;
● 而就 dwNumberOfBytesTransferred、dwCompletionKey 和 lpOverlapped 三个参数来说,每一个都允许我们指定一个值,
直接传递给 GetQueuedCompletionStatus 函数中对应的参数。这样一来,一个工作者线程收到传递过来的三个 GetQueuedCompletionStatus 函数参数后,
便可根据由这三个参数的某一个设置的特殊值,决定何时或者应该怎样退出。
例如,可用 dwCompletionPort 参数传递 0 值,而一个工作者线程会将其解释成中止指令。
一旦所有工作者线程都已关闭,便可使用 CloseHandle 函数,关闭完成端口,最终安全退出程序。

本套教程由VC驿站原创,完全免费!
相关课程演示细节还请观看视频教程!
教程下载地址:http://www.cctry.com/thread-2773-1-1.html




上一篇:WinSock异步IO模型-[4]重叠IO - Overlapped IO
下一篇:WinSock异步IO模型-[5]完成端口之线程池技术

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你已经在论坛发帖求助,并且从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友?可以给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

您需要登录后才可以回帖 登录 | 加入驿站 qq_login

本版积分规则

关闭

站长提醒上一条 /1 下一条

QQ
QQ在线咨询
联系电话
13591366679
手机扫一扫 关注本站精彩内容
wxqrcode

QQ|小黑屋|手机版|VC驿站 ( 辽ICP备09019393号tongdun|网站地图wx_jqr

GMT+8, 2018-10-16 04:25

Powered by Discuz! X3.4

© 2009-2018 cctry.com

快速回复 返回顶部 返回列表