DWORD SleepEx(
DWORD dwTimeout,
BOOL fAlertable);
DWORD WaitForSingleObjectEx(
HANDLE hObject,
DWORD dwTimeout,
BOOL fAlertable);
DWORD WaitForMultipleObjectsEx(
DWORD cObjects,
PHANDLE phObjects,
BOOL fWaitAll,
DWORD dwTimeout,
BOOL fAlertable);
BOOL SignalObjectAndWait(
HANDLE hObjectToSignal,
HANDLE hObjectToWaitOn,
DWORD dwMilliseconds,
BOOL fAlertable);
DWORD MsgWaitForMultipleObjectsEx(
DWORD nCount,
PHANDLE pHandles,
DWORD dwMilliseconds,
DWORD dwWakeMask,
DWORD dwFlags);
前面四个函数的最后一个参数为布林值,指示呼叫线程是否应该把自己加进可警告的状态。对MsgWaitForMultipleObjectsEx,您必须使用MWMO_ALERTABLE标记,以让线程进入可警告的状态。假如您熟悉Sleep、WaitForSingleObject及WaitForMultipleObjects函数,您就不应该在学习它们时感到惊讶。在内部,这些总是传递FALSE以作为fAlertable参数之non-Ex函数以呼叫它们的*Ex副本。
当您呼叫刚才提到的五个函数中的一个,并且将线程加进可警告的状态时,系统首先会检查线程的APC伫列。假如至少有一个入口在伫列中,系统不会将线程放进睡眠状态;取代它的是,系统从APC伫列中拉出入口,而且线程呼叫收回例行程序,以传递完成I/O请求的错误程序代码、转移位元组的数量及OVERLAPPED结构的位址到例行程序中。当收回例行程序返回到系统时,系统会检查APC伫列中的更多入口。假如存在更多入口,则它们会被处理。然而,假如不再有入口存在,则可警告的函数呼叫会返回。有几件要记住的事情,假如当您呼叫这些函数中的任何一个时,只要有入口存在线程的APC伫列中,您的线程绝不会进入睡眠状态!
这些函数唯一会暂停线程的时间是当您在线程的APC伫列中没有入口时呼叫函数时。当线程被暂停时,假如您等待的核心物件(或物件)变成通知状态或是APC入口在线程的伫列中出现时,则线程将会醒来。由于线程在可警告的状态中,APC入口一出现,系统立即把您的线程叫醒并且清空伫列(经由呼叫收回例行程序)。然后函数立即返回呼叫者处-即您的线程不会回到等待核心物件变成通知状态时的睡眠状态。
从这五个函数传回的值指出它们为什么会返回的原因。假如传回值是WAIT_IO_COMPLETION,您可以知道线程正在继续执行,因为至少有一个来自线程的APC伫列的入口被处理。假如传回值是别的结果,因为睡眠期间到期所以线程是醒着的;指定的核心物件或物件会变成通知状态。
可警告I/O之利与弊
我们已经讨论过执行可警告I/O的机制。现在,您必须知道有两个因为实行设备I/O而使可警告I/O方法变得很糟的相关议题。
- 回呼函数 为了建立回呼函数而发出可警告I/O请求将使您在实作程序代码变得更加困难。在特殊问题方面,这些回呼函数通常不会有足够资讯指引您,因此总有一天您会将许多资讯放置到全域变数中。幸运的是,这些全域变数并不需要被同步,因为呼叫其中一个可警告函数的线程和执行回呼函数的线程是相同的。单一的线程不能于同一时间存在两个位置中,所以变数是安全的。
- 线程的发布 可警告I/O的真正问题是:发布I/O请求的线程也必须处理完成通知。假如某个线程发布了几个请求,即使它已经进入完全闲置的状态也必须回答每个请求的完成通知。由于没有载入平衡,所以应用程序并没有调适的很好。
这两个问题是相当严重的,因此我强烈地劝阻因为设备I/O而使用可警告I/O的做法。相信您正在想下一节所讨论的I/O完成端口机制能解决我刚才所讨论的这两个问题,但在进行到I/O完成端口之前,我会先说明一些有关可警告I/O之基础建设的不错元素。
Windows提供了可让您手动将一个入口放入线程APC伫列的函数:
DWORD QueueUserAPC(
PAPCFUNC pfnAPC,
HANDLE hThread,
ULONG_PTR dwData);
第一个参数是指向APC函数的指标,必须有下列的原型:
VOID WINAPI APCFunc(ULONG_PTR dwParam);
第二个参数是您想要放入伫列入口的线程handle。请注意,这个线程可以是系统中的任何一个。假如hThread指出了某个在不同程序位址空间的线程,则pfnAPC就必须指出在目标线程之行程位址空间中的函数内存位址。QueueUserAPC的最后一个参数dwData,是简单地传递给回呼函数的值。
虽然QueueUserAPC传回DWORD的原型,但函数实际上会传回一个BOOL来指示成功或失败。您可以看到QueueUserAPC非常有效地执行线程之间的通讯,甚至跨越了程序的界限。然而不幸的是您只能传递一个单一的值。
QueueUserAPC也可以用来强迫线程从等待状态中离开。假定您有某个线程呼叫了WaitForSingleObject以等待核心物件变成通知状态,当线程正在等待时,使用者想要终止应用程序;您知道线程应该乾净地毁坏它们自己,但您如何强迫伺候核心物件的线程醒来并且终止自己呢?QueueUserAPC就是答案。
以下的程序代码示范了如何强迫线程从等待状态中离开以便线程可以完全清除自己并离开。主要的函数产生了一个新的线程,并传递某些核心物件的handle给它。当第二个线程在执行时,主要的线程也在执行。第二个线程(执行ThreadFunc函数)呼叫暂停线程之WaitForSingleObjectEx并将它加进可警告的状态。然后由使用者命令主要的线程终止应用程序。当然,主要线程可以合理的存在且系统会终止整个行程,但这个应用程序不会被清除的非常乾净,而且在许多情况下,您只是想在未终止整个行程前即删除某个操作。
因此,呼叫QueueUserAPC的主要线程放置了一个APC入口到第二个线程的APC伫列中。由于第二个线程为可警告的状态,所以经由呼叫函数可使它醒来并且清空它的APC伫列。这个函数完全没做事即返回。由于APC伫列现在是空的,使用它传回的伫列所跟随的一个WAIT_IO_COMPLETION传回值来呼叫WaitForSingleObjectE。ThreadFunc函数会特别地检查这个传回值以知道它接收了一个指出线程应该存在的APC入口。
// APC回呼函数没有做任何事
VOID WINAPI APCFunc(ULONG_PTR dwParam) {
// 这里没有什么要做的
}
UINT WINAPI ThreadFunc(PVOID pvParam) {
HANDLE hEvent = (HANDLE) pvParam; // Handle被传递此线程
// 在可警告的状态中等待以便我们可以被强迫清除自己并离开
DWORD dw = WaitForSingleObjectEx(hEvent, INFINITE, TRUE);
if (dw == WAIT_OBJECT_0) {
// 物件变成通知状态
}
if (dw == WAIT_IO_COMPLETION) {
// QueueUserAPC强迫我们自等待状态中离开
return(0); // 线程被乾净地删除
}
.
.
.
return(0);
}
void main() {
HANDLE hEvent = CreateEvent(...);
HANDLE hThread = (HANDLE)_beginthreadex(NULL, 0,
ThreadFunc, (PVOID) hEvent, 0, NULL);
.
.
.
// 强迫第二个线程清除自己并离开
QueueUserAPC(APCFunc, hThread, NULL);
WaitForSingleObject(hThread, INFINITE);
CloseHandle(hThread);
CloseHandle(hEvent);
}我知道有些人会认为这个问题可以经由呼叫WaitFo-MultipleObjects来取代WaitForSingleObjectEx,以及建立另一个事件核心物件来发出第二个线程终止的通知而使此问题被解决。以我举的例子来说,您的解决办法可能会有用。然而,假设第二个线程呼叫了WaitForMultipleObjects,等待所有的物件变成通知状态为止;此时QueueUserAPC会是强迫线程自等待状态中离开的唯一方法。
关于如何以这种方式使用QueueUserAPC的范例,请看本书第五章中的 RegNotify 范例应用程序,以及《Programming Applications for Microsoft Windows, Fourth Edition》(Jeffrey Richter, Microsoft Press, 1999)书中第十一章的WaitForMultipleExpressions范例应用程序。
I/O完成端口
Windows 2000被设计成一个安全、强健的作业系统,用来正确地执行服务数千个使用者的应用程序。依据以往的经验来说,您已经能够经由以下二种模式之一创造出服务应用程序:
- 序列模式 等待客户端产生一个请求之单一线程(通常是在网路上)。当请求到达时,线程会醒来并且处理客户端的请求。
- 协同模式 等待客户端的请求之单一线程,并且建立一个新的线程处理请求。当新的线程正在处理客户端的请求时,最初的线程会回到最初的状态并且等待另一个客户端请求。当处理客户端请求的线程处理完成,该线程就被清除。
序列模式的问题是它对多重、同步的请求处理得不是很好。假如两个客户端在相同时间产生请求,则一次只能处理一个,第二个请求必须等待第一个请求结束行程。一个设计为使用序列方式的服务不能用于多重处理器。显然地,序列模式只在最简单的服务器应用程序上有效,在那里,少数的客户端请求被产生而且可被非常快速地处理,Ping服务器是即序列服务器的一个例子。
因为序列模式中的限制,使得协同模式非常受欢迎。在协同模式中,为了处理每个客户端请求而产生线程。其优点是等待请求之线程所做的工作很少。大部分的时间里,线程是处于闲置状态的。当某个客户端请求到达时,线程会醒来并且产生一个新的线程以处理请求,然后继续等待另一个客户端请求。这意味着到达的客户端请求可被方便地处理。再者,因为每个客户端请求皆可取得它自己的线程,所以服务器应用程序可调适得很好而且可以简单地利用多重处理器。因此,假如您正在使用协同模式且更新了硬体设备(增加另一个CPU),服务器应用程序效能可被增加。
服务器应用程序使用Windows来实作协同模式。Windows团队注意到应用程序效能并没有如想像中的高。尤其是注意到处理许多同时产生的请求即表示有许多线程在系统上。因为这些线程皆是可执行的(不暂停且等待某些事发生),所以Microsoft了解到Windows核心花了太多时间在线程之间交换,而且线程并没有取得足够的CPU时间执行工作。为了使Windows成为更完善之服务器环境,Microsoft必须正视这个问题,而结果就是I/O完成端口核心物件。
产生I/O完成端口
I/O完成端口之背后的理论即是同时执行的线程数量必须有个上限;即500个同时发生的客户端请求不能允许500可执行的线程同时存在。那么,什么是适当的可执行之线程数量?假如您曾经思考这个问题,那么您将会领悟到如果一台有二个CPU的机器,拥有多于二个可执行之线程(即每个处理器一个)是没有任何意义的。一旦您所拥有的可执行之线程多于可用的CPU数,则系统必须延长执行中的线程内容交换时间,因而浪费了宝贵的CPU周期-此为协同模式的潜在缺点。另一个协同模式的缺点是它会为每个客户端请求建立一个新的线程。与使用它自己之虚拟位址空间来建立一个新的程序相比较,建立一个线程较不费力,但是线程的建立方式并不自由。假如在应用程序初始化时建立线程的集区,则可以改进服务应用程序的效能,而且这些线程会因为应用程序的持续而四处徘徊。I/O完成端口被设计为与线程的集区一起工作。
I/O完成端口可能是最复杂的核心物件。为了要建立一个I/O完成端口,您要呼叫CreateIoCompletionPort:
HANDLE CreateIoCompletionPort(
HANDLE hfile,
HANDLE hExistingCompPort,
ULONG_PTR CompKey,
DWORD dwNumberOfConcurrentThreads);
这个函数执行了二个不同的任务:它建立一个I/O完成端口并把设备和I/O完成端口联系在一起。这个函数非常地复杂,依我的看法,Microsoft应该将它切成二个独立的函数。当我在使用I/O完成端口工作时,会经由建立两个抽象化CreateIoCompletionPort的极小程序来分割这两个功能。我编写的第一个函数称为CreateNewCompletionPort并且实作如下:
HANDLE CreateNewCompletionPort(DWORD dwNumberOfConcurrentThreads) {
return(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
dwNumberOfConcurrentThreads));
}这个函数会接收一个参数dwNumberOfConcurrentThreads,然后呼叫Windows之CreateIoCompletionPort函数,并且传递硬体呼叫的值作为前叁个参数,而dwNumberOfConcurrentThreads为最后一个参数。您可以看到CreateIoCompletionPort的前叁个参数只有在当您将设备和完成端口联系在一起时才会被使用(我将会作简短地谈论)。为了要建立一个完成端口,我分别地传递了INVALID_HANDLE_VALUE、NULL、和0给CreateIoCompletionPort的前叁个参数。
dwNumberOfConcurrentThreads参数告诉I/O完成端口在同一时间可执行之线程最大数量。假如您传递0给dwNumberOfConcurrentThreads参数,那么完成端口之预设值是允许与主机上之CPU数量一样的线程同时发生。这通常会是您所要的,以便避免过度的内文交换的情形发生。假如客户端请求的程序需要一个冗长的计算而只有很少的区块可用时,您可能会想要增加这个值,但是增加这个值会使您相当沮丧。您可以经由测试不同的值和比较应用程序效能来对dwNumberOfConcurrentThreads参数进行试验。
您会注意到CreateIoCompletionPort几乎是建立核心物件唯一的一个Windows函数,但却不能让您传递SECURITY_ATTRIBUTES结构位址,这是因为完成端口本意是只在单一的程序中使用。当我说明如何使用完成端口时,您会清楚地知道这个原因。
联系设备与I/O完成端口
当您建立了一个I/O完成端口,核心实际上会建立五个不同的资料结构,如图2-1所示。在您继续阅读时会参考到这个图。
| 图2-1 I/O完成端口之内部操作 |
第一个资料结构是指出设备或把设备和通讯埠联系在一起的设备清单。您可以经由呼叫CreateIoCompletionPort而把设备和通讯埠联系在一起。再者,我建立了自己的函数,AssociateDevice-WithCompletionPort,以抽象化CreateIoCompletionPort:
BOOL AssociateDeviceWithCompletionPort(
HANDLE hCompPort, HANDLE hDevice, DWORD dwCompKey) {
HANDLE h = CreateIoCompletionPort(hDevice, hCompPort, dwCompKey, 0);
return(h == hCompPort);
}
AssociateDeviceWithCompletionPort附加了一个从入口到现行完成端口的设备清单。您会传递一个现行完成端口的handle(被先前CreateNewCompletionPort的呼叫所传回)、设备的handle(可以是文件、套接字、mailslot、管道等等)、完成的关键(一个对您有意义的值;作业系统不会管您在此传递的是什么)到函数中。每当您把设备和通讯埠联系在一起,系统便会附加从这个资讯到完成端口的设备清单。
说明
我建议您在心理上分隔这两个呼叫的原因是因为CreateIoCompletionPort函数很复杂。使这个函数变得如此复杂有一个好处:您可以在同一时间内建之一个I/O完成端口并且把它和设备联系在一起。举例来说,以下的程序代码会开启一个文件并且建立一个新的完成端口,并把它和文件联系在一起。所有对文件的I/O请求将会随着CK_FILE之完成关键而完成,通讯埠允许两个线程同时地执行。
#define CK_FILE 1
HANDLE hfile = CreateFile(...);
HANDLE hCompPort = CreateIoCompletionPort(hfile, NULL, CK_FILE, 2);
第二个资料结构是I/O完成伫列。当一个对设备的非同步I/O请求完成时,系统会检查设备和完成端口是否已联系在一起,假如有,系统会附加一个完成I/O请求入口到完成端口之I/O完成伫列的尾端。每个在伫列中的入口皆指出转移位元组的数量、当设备和通讯埠联系在一起时所设定的完成关键值、一个指向I/O请求OVERLAPPED结构的指标以及错误程序代码。我会简短地讨论如何从这个伫列中移除入口的方式。
说明
对设备发布一个I/O请求并且不让I/O完成入口伫列到可能的I/O完成端口中。这通常不是必要的,但是它偶尔会有用。举例来说,当您不管资料传递成功与否皆透过套接字传送资料时。为了要发布一个没有完成入口伫列的I/O请求,必须使用一个有效的事件handle来载入OVERLAPPED结构的hEvent成员并且与1作bitwise-OR的运算,就像这样:
Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
Overlapped.hEvent = (HANDLE) ((DWORD_PTR) Overlapped.hEvent | 1);
ReadFile(..., &Overlapped);
现在您可以利用传递OVERLAPPED结构的位址到要求的函数中(如上面的ReadFile)来发布您的I/O请求。
最好您不需建立一个只用来停止I/O完成的伫列。我想要利用如下的方法来做,但是它并没有作用:
Overlapped.hEvent = 1;
ReadFile(..., &Overlapped);
再者,不要忘了在关闭事件handle之前重放low-order位元。
CloseHandle((HANDLE) ((DWORD_PTR) Overlapped.hEvent & ~1));
I/O完成端口周围的结构
当您的服务应用程序被初始化时,它应经由呼叫如CreateNewCompletionPort的函数来建立I/O完成。然后,应用程序应建立一个线程的集区。您现在问的这个问题是,「有多少线程应该在集区里?」这是个难以回答的问题,我将在稍后的〈有多少线程在集区中?〉一节中提出更多的细节。现在,标准的基本原则是取得在主机上的CPU数量然后将它乘以2。因此在双倍处理器的机器上,您应该建立四个线程的集区。
所有在集区中的线程皆应执行相同的函数。通常在服务程序被命令停止时,这个线程函数会执行一些初始化的动作,然后再进入一个应该终止的回圈。在该回圈内,线程会将自己放置到睡眠状态以等待对完成端口的设备I/O请求完成呼叫GetQueuedCompletionStatus之动作,它是这样子的:
BOOL GetQueuedCompletionStatus(
HANDLE hCompPort,
PDWORD pdwNumBytes,
PULONG_PTR CompKey,
OVERLAPPED** ppOverlapped,
DWORD dwMilliseconds);
第一个hCompPort参数指出线程对监视哪个完成端口有兴趣。许多服务应用程序会使用单一的I/O完成端口,并且完成所有对这个通讯埠的I/O请求通知。基本上,GetQueuedCompletionStatus的工作是将呼叫线程放置到睡眠状态,直到某个入口在特定完成端口之I/O完成伫列中出现,或是指定的逾时情形发生(在dwMilliseconds参数中指定)为止。
第叁个与I/O完成端口相关联的资料结构即是等待线程伫列。当线程集区中的每个线程呼叫GetQueuedCompletionStatus时,呼叫线程的ID被放置到等待线程伫列中,使I/O完成端口发布核心物件可以随时知道哪个线程正在等待处理完成的I/O请求。当某个入口在通讯埠的I/O完成伫列中出现时,完成端口便会叫醒等待线程伫列的其中一个线程。这个线程取得构成完成I/O入口的部分资讯:转移位元组的数量、完成的关键以及OVERLAPPED结构的位址。这个资讯会被传递到GetQueuedCompletionStatus的pdwNumBytes、pCompKey以及ppOverlapped参数中,并被传回给线程。
要判断GetQueuedCompletionStatus被传回的原因有点困难,以下的程序代码示范了恰当的实作方式:
DWORD dwNumBytes;
ULONG_PTR CompKey;
OVERLAPPED* pOverlapped;
// hIOCP在程序的其他地方被初始化
BOOL fOk = GetQueuedCompletionStatus(hIOCP,
&dwNumBytes, &CompKey, &pOverlapped, 1000);
DWORD dwError = GetLastError();
if (fOk) {
// 程序成功地完成I/O请求
} else {
if (pOverlapped != NULL) {
// 处理一个失败的完成I/O请求
// dwError包括失败的原因
} else {
if (dwError == WAIT_TIMEOUT) {
// 等待完成的I/O入口逾时
} else {
// 对GetQueuedCompletionStatus不适当的呼叫
// dwError包括不适当呼叫的原因
}
}
}
如您所预期的,入口以先进先出(FIFO)的方式从I/O完成伫列中移除。然而,您可能没有预料到呼叫GetQueuedCompletionStatus的线程却是以后进先出(LIFO)的方式被叫醒。这样做的原因是为了改进效能。举例来说,假定四个线程在等待线程伫列中等待。此时若某个单一完成的I/O入口出现,则最后一个呼叫GetQueuedCompletionStatus的线程会醒来处理这个入口。当这个线程完成处理入口后,线程会再次呼叫GetQueuedCompletionStatus并进入等待线程伫列。此刻假如另一个I/O完成入口出现,刚才处理入口的同一个线程会被叫醒并处理新的入口。
只要I/O请求缓慢地完成以使单一线程可以处理,此时系统只会保持一个线程为清醒的状态,而其他叁个线程则持续睡眠状态。经由使用LIFO演算法,没有排程的线程可以使它们的内存资源(例如堆叠空间)被交换到磁盘并且使处理器的快取被注满。这个使许多线程伺候一个完成端口的方法相当不错,假如您要使数个线程等待而完成少数的I/O请求,则额外的线程会使大部分资源自系统离开后交换。
I/O完成端口如何管理线程集区
此刻是讨论为何I/O完成端口是如此有帮助的时候了。第一,当您建立I/O完成端口时,您会具体指定线程可以同时执行的数量。就像我说的,您通常会设定这个值为主机上CPU的数量。当完成的I/O入口被伫列后,I/O完成端口应该叫醒等待中的线程。然而,完成端口将只叫醒您所指定的线程数量。因此,假如四个I/O请求完成,且四个线程在等待对GetQueuedCompletion Status的呼叫,此时I/O完成端口只允许两个线程醒来;其他两个线程则继续保持睡眠状态。当每个线程处理了一个完成的I/O入口后,它再次呼叫GetQueuedCompletionStatus。此时系统看到更多的入口被伫列并叫醒相同的线程去处里剩馀的入口。
如果您小心谨慎地思考,应该会注意到有些事实在毫无意义:如果完成端口至今只允许指定数量的线程同时醒来,为什么要让那么多线程在线程集区中等待?举例来说,假设我正在一台有两个CPU的机器上执行并且建立I/O完成端口,并命令它只能允绪两个线程同时地处理入口。但是我会在线程集区中建立四个线程(CPU数量的两倍)。它看起来好像是建立两个不会处理任何事情的线程。
但I/O完成端口是非常聪明的。当某个完成端口叫醒一个线程时,完成端口会将线程的ID放置到与它相关的第四个资料结构中—即一个释放的清单(请看 图2-1 )。这允许完成端口记得它所叫醒的线程并且监视这些线程的执行。假如某个被释放的线程呼叫了任一个放置线程至等待状态的函数,则完成端口会察觉并且将它从被释放的线程清单中移到暂停线程清单中,以更新它内部的资料结构(第五个和最后一个资料结构是I/O完成端口的一部份)。
完成端口的目标是保持释放线程清单中的入口与建立完成端口时所指定使用同时发生的线程数量相同。假如某个被释放的线程因为任一理由而进入等待状态,则被释放的线程清单会缩短,而完成端口则会释放另一个等待中的线程。假如某个暂停的线程醒来,它会离开暂停的线程清单并且再进入被释放的线程清单中。这个被释放线程清单的方法可以拥有比最大的同时值所允许的入口还多。
说明
一旦线程呼叫了GetQueuedCompletionStatus,这个线程即表示为指定的完成端口。系统会假定所有被分配的线程,以代表完成端口在做事。只要执行分配的线程数量少于完成端口的最大同时值,完成端口将会从集区叫醒线程。您可以用下列叁种方式之一来中断线程/完成端口分配的任务:
- 使线程离开。
- 使线程呼叫GetQueuedCompletionStatus并传递一个不同I/O完成端口的handle。
- 毁坏线程分配的I/O完成端口。
现在让我们将这些内容联系起来。假定我们还是在一台有两个CPU的机器上执行。我们建立一个只允许两个线程同时醒来的完成端口,并且建立四个等待完成I/O请求的线程。假如叁个伫列到通讯埠的完成I/O请求中,只使两个线程察觉到并处理请求,则减少了可执行的线程数量并且省下内文交换的时间。此时如为目前其中一个线程呼叫了Sleep、WaitForSingleObject、WaitForMultipleObjects和SignalObjectAndWait,一个同时的I/O呼叫,或是任何一个导致线程不可执行的函数,I/O完成将会察觉并且立即叫醒第叁个线程。完成端口的目标是使CPU保持工作的状态。
最后,第一个线程将再次变为可执行的。当它发生时,可执行的线程数量会比系统中的CPU数量还高。然而,完成端口会再次意识到它并且不允许任何额外的线程醒来,直到线程的数量降到CPU数量以下为止。I/O完成端口架构假设了可执行之线程数量只在短时间内停留在最大值之上,然后当线程四处徘徊且再次呼叫GetQueued-CompletionStatus时,会很快地逐渐消失。这解释了为什么线程集区中包含的线程应该多于完成端口所设定同时发生的线程总数。
有多少线程在集区中?
这是讨论线程集区中应该有多少线程的好时机。我们要考虑到两个议题。首先,当服务应用程序初始化时,您应该建立一组最小量之线程以使其不须在标准的基础上建立及毁坏线程。要记得建立和毁坏线程会浪费CPU时间,所以您最好尽可能使这个程序减到最少。第二,您应该设定线程的最大数量,因为建立太多线程会浪费系统资源。即使大部分的资源可以自RAM交换离开,使得系统资源的使用减到最少,如果您能做到的话,甚至一页的文件空间也不要浪费。
您大概想要实验不同的线程数量。大部分的服务(包括Microsoft Internet Information Services)皆使用尝试错误的演算法去管理它们的线程集区。我建议您也这样做。举例来说,您可以建立如下的变数去管理线程集区:
LONG g_nThreadsMin; // 集区中线程的最小数量
LONG g_nThreadsMax; // 集区中线程的最大数量
LONG g_nThreadsCrnt; // 集区中线程的当前数量
LONG g_nThreadsBusy; // 集区中忙碌的线程数量
当您的应用程序初始化时,您可以指定g_nThreadsMin数量的线程皆执行相同的线程集区函数。以下的虚拟程序代码说明了这个线程函数可能的样子:
DWORD WINAPI ThreadPoolFunc(PVOID pv) {
// 线程进入集区
InterlockedIncrement(&g_nThreadsCrnt);
InterlockedIncrement(&g_nThreadsBusy);
for (BOOL fStayInPool = TRUE; fStayInPool;) {
// 线程停止执行并且等待做某些事
InterlockedDecrement(&m_nThreadsBusy);
BOOL fOk = GetQueuedCompletionStatus(...);
DWORD dwIOError = GetLastError();
// 线程在做某些事,所以它是忙碌的
int nThreadsBusy = InterlockedIncrement(&m_nThreadsBusy);
// 我们应该加另一个线程到集区吗?
if (nThreadsBusy == m_nThreadsCrnt) { // 所有的线程都是忙碌的
if (nThreadsBusy < m_nThreadsMax) { // 集区不是满的
if (GetCPUUsage() < 75) { // CPU使用率低于75%
// 增加线程到集区
CloseHandle(chBEGINTHREADEX(...));
}
}
}
if (!fOk && (dwIOError == WAIT_TIMEOUT)) { // 线程逾时
if (!ThreadHasIoPending()) {
// 服务器没有重要的事做,而且这个线程
// 可能会死掉,因为它没有未完成的I/O请求
fStayInPool = FALSE;
}
}
if (fOk || (po != NULL)) {
// 线程醒来处理某些事;处理它
...
if (GetCPUUsage() > 90) { // CPU使用率高于90%
if (!ThreadHasIoPending()) { // 没有未决定的I/O请求
if (g_nThreadsCrnt > g_nThreadsMin)) { // 集区在最小量之上
fStayInPool = FALSE; //从集区中移除线程
}
}
}
}
}
// 线程离开集区
InterlockedDecrement(&g_nThreadsBusy);
InterlockedDecrement(&g_nThreadsCurrent);
return(0);
}这个虚拟程序代码显示出当您在使用I/O完成端口时所能获得的创造力。GetCPUUsage及ThreadHasIoPending函数不是Windows API的一部分。假如您想要使用它们,您必须自己实作这个函数。另外,您必须确保线程集区中至少随时包含一个线程,否则客户端将永远不会得到照顾。您可以使用我的虚拟程序代码为指导,但是若为您的特殊服务建立不同的程序代码的话,可能会执行得更好。
说明
本章稍早的〈取消伫列中的设备I/O请求〉一节中,曾经提到当线程结束时,系统会自动取消所有这个线程所发布之悬而未决的I/O请求。这就是为什么在虚拟程序代码中类似Thread HasIoPending的函数是必要的。假如线程拥有未完成的I/O请求,请不要允许它们终止。
许多服务提供了一个可让管理者控制线程集区行为的管理工具-举例来说,为了设定线程的最小和最大数量、CPU时间使用的门槛以及当建立I/O完成端口时所使用的最大同步值。
模拟已完成的I/O请求
I/O完成端口根本不必与设备I/O一起使用,不过本章还是会说明线程之间的通讯技巧,而I/O完成端口核心物件是个用来帮助这件事的了不起机制。在本章〈可警告I/O〉一节中介绍了QueueUserAPC函数,它允许线程分发某个APC入口到另一个线程中。I/O完成端口也有个类似的函数,即PostQueuedCompletionStatus:
BOOL PostQueuedCompletionStatus(
HANDLE hCompPort,
DWORD dwNumBytes,
ULONG_PTR CompKey,
OVERLAPPED* pOverlapped);
这个函数将一个己完成之I/O通知附加到I/O完成端口伫列中。第一个hCompPort参数确认您希望伫列入口的完成端口;剩馀的叁个参数:dwNumBytes、CompKey以及pOverlapped,则指出线程呼叫GetQueuedCompletionStatus后应该传回的值。当线程从I/O完成伫列取出一个模拟的入口时,GetQueuedCompletionStatus会传回TRUE以表示一个执行成功的I/O请求。
PostQueuedCompletionStatus函数极为有-它提供您一个与集区中所有与线程通讯的方法。举例来说,当使用它来终止一个服务应用程序时,您想要让所有的线程都离开;然而如果线程正在等待某个完成端口而且没有I/O请求到达,则该线程不能被唤醒。在集区中的每个线程只要经由呼叫PostQueuedCompletionStatus就可以醒来,检查从GetQueuedCompletionStatus所传回的值,您会发现应用程序结束并且已被适当地清理及离开。
当您使用像刚才所叙述的线程终止技巧时必须很小心。因为在集区中的线程即将结束并且不再呼叫GetQueuedCompletionStatus,所以范例可以行得通。然而,假如您想要通知每个集区的线程并且使它们四处徘徊,以便再次呼叫GetQueuedCompletionStatus时,会有个问题产生,因为线程会以LIFO的次序醒来。所以为了保证每个集区线程皆能获得察看它I/O入口的机会。如果没有这个额外的线程同步,一个线程可能会察看相同的通知好几次。
FileCopy范例应用程序
在本章结尾列表2-1的FileCopy范例应用程序(「02 FileCopy.exe」)示范了I/O完成端口的使用方法。应用程序的原始码和原始文件在随书光碟里的02-FileCopy目录中。这个程序会简单地复制使用者所指定的文件到一个名为FileCopy.cpy的新文件中。当使用者执行了FileCopy程序,会出现图2-2中的对话方块。
| 图2-2 FileCopy应用程序的对话方块 |
使用者可以点选路径名称按钮以选择被复制的文件,然后路径名称及文件大小栏位会被更新。当使用者点选复制按钮后,程序所呼叫的FileCopy函数会将所有困难的工作完成。
让我们全神贯注在FileCopy函数的讨论上。
当准备复制时,FileCopy会打开来源文件并撷取它的大小。我想要让文件复制的执行越快越好,所以使用了FILE_FLAG_NO_BUFFERING标记来开启文件。使用FILE_FLAG_NO_BUFFERING标记开启文件可让我直接存取文件,当允许系统的快取去「帮助」存取文件时可越过间接带来的额外内存复制。当然,直接存取文件对我来说意味着更多的工作:必须随时使用以磁盘容量区段大小为倍数的位移量来存取文件,而且也必须读取和写入以区段大小为倍数的资料。我选择以肯定是区段大小倍数的BUFFSIZE(64 KB)区块(chunks)来转移文件的资料。这就是为什么我会集中来源文件的大小,使其成为BUFFSIZE的倍数的缘故。您将注意到来源文件是以FILE_FLAG_OVERLAPPED标记开启的,以使对文件的I/O请求以非同步的方式执行。
目标文件以相同地方式被开启:FILE_FLAG_NO_BUFFERING及FILE_FLAG_OVERLAPPED标记都被指定。当建立目标文件时,也传递来源文件的handle作为CreateFile的hfileTemplate参数,使目的文件拥有和来源文件相同的属性。
说明
一旦这两个文件被开启,目标文件大小会经由呼叫SetFilePointerEx及SetEndOfFile而立即被设定成它最大的大小。立刻调整目标文件的大小是非常重要的,因为NTFS维持着一个高水平线,它会指出文件被写入的最高位置。假如您在这个记号前读取,系统会知道要回传0。假如您在这个记号前写入,从旧的高水位线到写入位移量的文件资料会被0填满,您的资料被写入文件且文件的高水位线会被更新。这种行为满足了有关不要呈现先前资料的C2安全规定。当您写入NTFS分割上的文件结尾使得高水位线移动时,即使被要求使用非同步I/O,NTFS也必须以同步的方式来执行I/O请求。假如FileCopy函数没有设定目标文件的大小,则没有任何重叠的I/O请求会被非同步地执行。
现在文件已经开启且准备好被处理了,FileCopy建立了一个I/O完成端口。为了使其容易与I/O完成端口一起工作,我建立了一个小的C++ 类别—CIOCP,这是一个非常简单封装的I/O完成端口函数。这个类别可以在附录B之〈类别库〉中所讨论的IOCP.h文件中找到。FileCopy经由建立一个CIOCP类别的实例(称为iocp)来创造I/O完成端口。
来源文件及目标文件会经由呼叫CIOCP的AssociateDevice成员函数来与完成端口关联。当与完成端口关联时,每个设备会被分配到一个完成的识别码。当某个对来源文件的I/O请求完成时,该完成识别码会是CK_READ,以指出读取操作必须被完成。同样地,对目标文件的I/P请求完成时,完成识别码是CK_WRITE,它指出读写操作必须被完成。
现在我们准备好初始化一组I/O请求(OVERLAPPED结构)以及它们的内存缓冲器。FileCopy函数在任何时间内皆会保留四个(MAX_PENDING_IO_REQS)重要的I/O请求。对您自己的应用程序来说,您可能宁愿允许I/O请求的数量在需要的时候动态地增加或减少。在FileCopy程序中之CIOReq类别封装了一个单一的I/O请求。如您所看到的,C++ 类别封装了单一的I/O请求。该C++ 类别来自OVERLAPPED结构,但是它包含了一些附加的内容资讯。FileCopy分配一个CIOReq物件的阵列并呼叫AllocBuffer方法将BUFFSIZE大小的资料缓冲器与每个I/O请求物件相关联。使用VirtualAlloc函数来分配资料缓冲器。使用VirtualAlloc来确保区块从适当间距的边界开始,这个界限满足了FILE_FLAG_ NO_BUFFERING标记的需求:缓冲器必须经由容量之区段大小平均地分割。为了发布来源文件初始的读取请求,我执行了一些小技巧:分发四个CK_WRITE I/O完成通知到I/O完成端口中。当主要的回圈被执行时,线程会在通讯埠上等待并且立即醒来,且认为写入操作已经完成。这会导致线程发布一个对来源文件的读取请求,即实际开始文件复制。
当没有未完成的I/O请求时,主要的回圈结束。
只要I/O请求未被完成,回圈内部便会经由呼叫CIOCP的GetStatus方法(在内部呼叫GetQueuedCompletionStatu)在完成端口上等待。这个呼叫将线程改变成睡眠状态,直到对完成端口的I/O请求完成为止。当GetQueuedCompletion Status返回时,会检查被传回CompKey的完成识别码。假如CompKey为CK_READ,则表示对来源文件的I/O请求已完成。然后再呼叫CIOReq的Write方法来发布对目标文件的写入I/O请求。假如CompKey为CK_WRITE,则表示对目标文件的I/O请求已完成。如果没有读取到来源文件的结尾,则呼叫CIOReq的Read方法继续读取来源文件。
当不再有未完成的I/O请求时,可以经由关闭来源及目标文件handles来使回圈结束及清理。在FileCopy返回前,它必须多做一个工作:它必须将目标文件的大小与来源文件的大小调整为相同。为了达成目的,并没有指定FILE_FLAG_NO_BUFFERING标记以重新开启目标文件。也正因为没有使用这个标记,所以文件不必在区段的界线上执行操作,这能让目标文件的大小缩至与来源文件大小相同。
FileCopy.cpp
/********************************************************************
模组:FileCopy.cpp
通告:Copyright (c)2000 Jeffrey Richter
********************************************************************/
#include "..\CmnHdr.h" // 请参阅附录A
#include
#include "..\ClassLib \IOCP.h" //请参阅附录B
#include "..\ClassLib \EnsureCleanup.h" //请参阅附录B
#include "Resource.h"
///////////////////////////////////////////////////////////////////////////////
// 每个I/O请求需要OVERLAPPED结构及资料缓冲区
class CIOReq :public OVERLAPPED {
public:
CIOReq() {
Internal = InternalHigh = 0;
Offset = OffsetHigh = 0;
hEvent = NULL;
m_nBuffSize = 0;
m_pvData = NULL;
}
~CIOReq() {
if (m_pvData != NULL)
VirtualFree(m_pvData, 0, MEM_RELEASE);
}
BOOL AllocBuffer(SIZE_T nBuffSize) {
m_nBuffSize = nBuffSize;
m_pvData = VirtualAlloc(NULL, m_nBuffSize, MEM_COMMIT, PAGE_READWRITE);
return(m_pvData != NULL);
}
BOOL Read(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) {
if (pliOffset != NULL) {
Offset = pliOffset->LowPart;
OffsetHigh =pliOffset->HighPart;
}
return(::ReadFile(hDevice, m_pvData, m_nBuffSize, NULL, this));
}
BOOL Write(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) {
if (pliOffset != NULL) {
Offset = pliOffset->LowPart;
OffsetHigh = pliOffset->HighPart;
}
return(::WriteFile(hDevice, m_pvData, m_nBuffSize, NULL, this));
}
private:
SIZE_T m_nBuffSize;
PVOID m_pvData;
};
///////////////////////////////////////////////////////////////////////////////
#define BUFFSIZE (64 * 1024) // I/O缓冲器的大小
#define MAX_PENDING_IO_REQS 4 // I/O的最大数量
// 完成识别值,以指出完成的I/O型态
#define CK_READ 1
#define CK_WRITE 2
///////////////////////////////////////////////////////////////////////////////
BOOL FileCopy(PCTSTR pszFileSrc, PCTSTR pszFileDst) {
BOOL fOk = FALSE; // 假定文件复制失败
LARGE_INTEGER liFileSizeSrc = {0 }, liFileSizeDst;
try {
{
// 没有缓冲就开启来源文件并撷取文件的大小
CEnsureCloseFile hfileSrc = CreateFile(pszFileSrc, GENERIC_READ,
FILE_SHARE_READ, NULL, OPEN_EXISTING,
FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, NULL);
if (hfileSrc.IsInvalid()) goto leave;
// 撷取文件的大小
GetFileSizeEx(hfileSrc, &liFileSizeSrc);
// 非缓冲的I/O请求区段大小转移
// 我使用缓冲器大小的转移,因为它较容易计算
liFileSizeDst.QuadPart = chROUNDUP(liFileSizeSrc.QuadPart, BUFFSIZE);
// 没有缓冲就开启目标文件并撷取文件的大小
CEnsureCloseFile hfileDst = CreateFile(pszFileDst, GENERIC_WRITE,
0, NULL, CREATE_ALWAYS,
FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, hfileSrc);
if (hfileDst.IsInvalid()) goto leave;
// 文件系统同时扩大文件。现在扩大目标文件
// 以至于I/O非同步地执行改进了效能
SetFilePointerEx(hfileDst, liFileSizeDst, NULL, FILE_BEGIN);
SetEndOfFile(hfileDst);
// 建立I/O完成端口并且与文件相关联。
CIOCP iocp(0);
iocp.AssociateDevice(hfileSrc, CK_READ); // 从来源文件读取
iocp.AssociateDevice(hfileDst, CK_WRITE); // 写入目标文件
// 初始化保持记录变数
CIOReq ior[MAX_PENDING_IO_REQS];
LARGE_INTEGER liNextReadOffset = { 0 };
int nReadsInProgress = 0;
int nWritesInProgress = 0;
// 最初的文件复制经由模拟写入已经完成来发动
//这导致读取操作被发布
for (int nIOReq = 0;nIOReq < chDIMOF(ior); nIOReq++) {
// 每个I/O请求需要转移的资料缓冲器
chVERIFY(ior[nIOReq].AllocBuffer(BUFFSIZE));
nWritesInProgress++;
iocp.PostStatus(CK_WRITE, 0, &ior[nIOReq]);
}
// 当未完成的I/O仍然存在时继续执行回圈
while ((nReadsInProgress > 0) || (nWritesInProgress > 0)) {
// 悬置线程直到I/O完成
ULONG_PTR CompKey;
DWORD dwNumBytes;
CIOReq* pior;
iocp.GetStatus(&CompKey, &dwNumBytes, (OVERLAPPED**) &pior,
INFINITE);
switch (CompKey) {
case CK_READ: // 读取完成,写至目的地
nReadsInProgress--;
pior->Write(hfileDst); // 写入与从来源读取出来相同的位移量
nWritesInProgress++;
break;
case CK_WRITE: // 写入完成,从来源读取
nWritesInProgress--;
if (liNextReadOffset.QuadPart < liFileSizeDst.QuadPart) {
// 不是EOF,读取下一个来源文件的区块资料
pior->Read(hfileSrc, &liNextReadOffset);
nReadsInProgress++;
liNextReadOffset.QuadPart += BUFFSIZE; // 推进来源位移量
}
break;
}
}
fOk =TRUE;
}
leave:;
}
catch (...) {
}
if (fOk) {
// 目标文件大小是分页大小的倍数。开启有缓冲
// 的文件以缩减它的大小,使其成为来源文件的大小
CEnsureCloseFile hfileDst = CreateFile(pszFileDst, GENERIC_WRITE,
0, NULL, OPEN_EXISTING, 0, NULL);
if (hfileDst.IsValid()) {
SetFilePointerEx(hfileDst, liFileSizeSrc, NULL, FILE_BEGIN);
SetEndOfFile(hfileDst);
}
}
return(fOk);
}
///////////////////////////////////////////////////////////////////////////////
BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam) {
chSETDLGICONS(hwnd, IDI_FILECOPY);
// 使复制按钮失效,因为还没选择文件
EnableWindow(GetDlgItem(hwnd, IDOK), FALSE);
return(TRUE);
}
///////////////////////////////////////////////////////////////////////////////
void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtl, UINT codeNotify) {
TCHAR szPathname [_MAX_PATH ];
switch (id) {
case IDCANCEL:
EndDialog(hwnd, id);
break;
case IDOK:
// 复制来源文件到目标文件
Static_GetText(GetDlgItem(hwnd, IDC_SRCFILE),
szPathname, sizeof(szPathname));
SetCursor(LoadCursor(NULL, IDC_WAIT));
chMB(FileCopy(szPathname, TEXT("FileCopy.cpy"))
? "File Copy Successful" : "File Copy Failed");
break;
case IDC_PATHNAME:
OPENFILENAME ofn = { OPENFILENAME_SIZE_VERSION_400 };
ofn.hwndOwner = hwnd;
ofn.lpstrFilter = TEXT("*.*\0 ");
lstrcpy(szPathname, TEXT("*.*"));
ofn.lpstrFile = szPathname;
ofn.nMaxFile = chDIMOF(szPathname);
ofn.lpstrTitle = TEXT("Select file to copy");
ofn.Flags = OFN_EXPLORER | OFN_FILEMUSTEXIST;
BOOL fOk = GetOpenFileName(&ofn);
if (fOk) {
// 显示来源文件大小给使用者
Static_SetText(GetDlgItem(hwnd, IDC_SRCFILE), szPathname);
CEnsureCloseFile hfile = CreateFile(szPathname, 0, 0, NULL,
OPEN_EXISTING, 0, NULL);
if (hfile.IsValid()) {
LARGE_INTEGER liFileSize;
GetFileSizeEx(hfile, &liFileSize);
// 注意:只出现大小的下面(bottom)32位元
SetDlgItemInt(hwnd, IDC_SRCFILESIZE, liFileSize.LowPart, FALSE);
}
}
EnableWindow(GetDlgItem(hwnd, IDOK), fOk);
break;
}
}
///////////////////////////////////////////////////////////////////////////////
INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam) {
switch (uMsg) {
chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand);
}
return(FALSE);
}
///////////////////////////////////////////////////////////////////////////////
int WINAPI _tWinMain(HINSTANCE hinstExe, HINSTANCE, PTSTR pszCmdLine, int) {
DialogBox(hinstExe, MAKEINTRESOURCE(IDD_FILECOPY), NULL, Dlg_Proc);
return(0);
} //////////////////////////////// End of File //////////////////////////////////
| 列表2-1 FileCopy范例应用程序
|