Douglas C. Schmidt Irfan Pyarali
本文关注ACE构架[2]中的反应堆(Reactor)模式[1]的设计和实现。反应堆模式处理由一或多个客户并发地递送给应用的服务请求。应用的每个服务可由一或多个方法组成,并由一个单独的事件处理器代表;事件处理器负责分派服务特有的请求。
在本论文描述的反应堆模式的实现中,事件处理器分派由ACE_Reactor对象来完成。ACE_Reactor结合了I/O事件,以及其他类型的事件,比如定时器和信号的多路分离。在此实现的核心是一个同步的事件多路分离器,例如,select[3]或WaitForMultipleObjects[4]。当事件发生时,ACE_Reactor自动分派预登记的事件处理器的方法,后者随即执行应用指定的服务。
本论文被组织如下:7.2描述ACE_Reactor构架中的主要特性;7.3概述ACE_Reactor实现的OO设计[2];7.4检查若干例子、演示ACE_Reactor怎样简化并发的事件驱动网络应用的开发;7.5描述使用ACE_Reactor开发事件驱动应用时所应遵循的设计规则;7.6给出结束语。
ACE_Reactor提供OO多路分离和分派构架,它简化了事件驱动应用的开发。下面的段落描述反应堆构架提供给应用开发者的特性:
统一的OO多路分离和分派接口:使用ACE_Reacotr的应用并不直接访问诸如select或WaitForMultipleObjects这样的低级事件多路分离系统调用,而是通过从ACE_Event_Handler基类继承来创建具体的事件处理器。该类指定处理多种类型事件的虚方法,比如I/O事件、定时器事件、信号和同步事件。图7-1演示ACE_Reactor中的关键组件。它演示了实现7.4描述的日志服务器的具体事件处理器。使用反应堆构架的应用创建具体事件处理器,并将它们登记到ACE_Reactor。
使事件处理器分派自动化:当ACE_Reactor管理的处理器上有活动发生时,反应堆自动调用适当的预登记的具体事件处理器的虚方法。C++对象被登记到ACE_Reactor。对象、而不是单独的函数的使用,允许在对具体事件处理器的挂钩方法的调用之间,状态可以方便地保持。这种风格的OO编程对于开发在多次客户调用间保持状态的事件处理器来说很有用。
支持透明的可扩展性:ACE_Reactor的功能和它的已登记的事件处理器可被透明地扩展,而无须修改或重编译现有代码。为实现这样的扩展性,反应堆构架采用继承和动态绑定来去耦(1)它的较低级的事件多路分离和分派机制与(2)应用定义的处理事件的较高级的策略。
ACE_Reactor管理的低级机制包括检测多个I/O句柄上的事件、使定时器到期,以及分派适当的事件处理器方法来处理这些事件。应用特有的具体事件处理器执行的较高级策略包括连接建立策略、数据编码和解码,以及处理来自客户的服务请求。例如,TAO[5]实时CORBA ORB使用反应堆构架来分离它的低级事件多路分离机制和它的GIOP连接管理和协议处理[5]的较高级策略。

图7-1 反应堆组件
增加复用:ACE_Reactor的多路分离和分派机制可被许多网络应用复用。通过复用,而不是重新发明,开发者可以专注于较高级的应用特有的策略,而不是反复与较低级事件多路分离和分派机制相纠缠。
相反,直接使用像select和WaitForMultipleObject这样的低级事件多路分离操作的程序员必须为每一个应用重新实现、调试和调谐同样的多路分离和分派代码。而所有利用ACE_Reactor的应用自动地复用它的特性,以及将来的增强和优化。
增强类型安全性:ACE_Reactor将应用开发者和易错的细节(它们与使用像select这样的低级事件多路分离系统调用来编程是相关联的)屏蔽开来。这些细节涉及设置和清除位掩码、处理超时和中断,以及分派挂钩方法。特别地,ACE_Reacotr消除了导致select出错的若干微妙原因,这些错误涉及fd_set位掩码的误用。
改善可移植性:ACE_Reactor在若干事件多路分离机制上运行,其中包括Win32 WaitForMultipleObjects和select(它在UNIX和Win32上都可用)。ACE_Reactor将应用和底层事件多路分离机制的移植性差异屏蔽开来。如图7-6所示,ACE_Reactor向应用输出同样的接口,而不管底层系统调用是什么。而且,ACE_Reactor使用像桥接(Bridge)[6]这样的设计模式来增强它内部的可移植性。因而,将ACE_Reactor从select移植到WaitForMultipleObjects仅需要对构架进行一些局部变动[7]。
线程安全性:反应堆构架是完全多线程的。所以多个线程可安全地共享单个ACE_Reactor。同样地,多个ACE_Reactor也可在一个进程的不同线程中运行。反应堆构架提供必要的同步机制来防止条件竞争和类中方法的死锁[8]。
高效的多路分离:ACE_Reactor非常高效地执行它的事件多路分离逻辑。例如,基于select的ACE_Reactor使用7.3.2中描述的ACE_Handle_Set类来避免每次只查看fd_set位掩码的一位。这一优化基于一种成熟的算法,使用异或操作符来将运行时复杂度从O(总位数)降低到O(已置位的位数)。这充分地减少了运行时开销。
这一部分描述反应堆构架的OO设计。主要焦点在于它的组件的结构和关键的设计决策。在适当的地方还讨论了实现细节。7.3.1概述OS平台无关的组件,而7.3.2覆盖平台相关的组件。
这一小部分总结反应堆构架中的平台无关的类,其中包括ACE_Reactor、ACE_Time_Value、ACE_Timer_Queue和ACE_Event_Handler。
ACE_Reactor定义反应堆构架的公共接口。图7-2演示ACE_Reactor类中关键的公共方法。此类中的方法大致被分组为以下几个种类:
管理器方法:构造器和open方法通过动态地分配多个实现对象来创建和初始化ACE_Reactor对象;这些实现对象在下面的7.3.2.1和7.3.2.2中描述。析构器和close方法释放这些对象。
基于I/O的方法:派生自ACE_Event_Handler的具体事件处理器通过反应堆的register_handler方法登记到ACE_Reactor;同样地,具体事件处理器可以通过它的remove_handler方法移除。
基于定时器的方法:ACE_Reactor的定时器策略使用最终期限来评估被调度的定时器的优先级。ACE_Reactor的定时器策略提供的操作包括(1)登记将在用户指定时间执行的具体事件处理器和(2)取消先前登记的事件处理器。
事件循环方法:在登记初始的具体事件处理器之后,应用最终进入一个事件循环,重复调用ACE_Reactor的handle_events方法中的一个。这些方法阻塞应用指定长度的时间,等待各种事件的发生,比如I/O句柄上的同步I/O事件或基于定时器的事件。在事件发生时,ACE_Reactor分派具体事件处理器的适当方法;这些方法已被应用登记以处理这些事件。
class ACE_Reactor
{
public:
// = Initialization and termination methods.
enum { DEFAULT_SIZE = FD_SETSIZE };
// Initialize a Reactor instance that may
// contain SIZE entries (indicates
// to restart system calls after interrupts).
ACE_Reactor (int size, int restart = 0);
virtual int open (int size = DEFAULT_SIZE, int restart = 0);
// Perform cleanup activities to close down
// an instance of an.
void close (void);
?ACE_Reactor (void);
// = I/O-based event handler methods.
// Register anobject according
// to the(s) (e.g., READ_MASK,
// WRITE_MASK, etc.).
virtual int register_handler (ACE_Event_Handler *, ACE_Reactor_Mask);
// Remove the handler associated with the
// appropriate(s).
virtual int remove_handler (ACE_Event_Handler *, ACE_Reactor_Mask);
// = Timer-based event handler methods.
// Register a handler to expire at time.
// Whenexpires the
// method will be called with the current time
// andas parameters. Ifis > 0
// then the handler is reinvoked periodically
// at that. Theis interpreted
// "relative" to the current time of day.
virtual void schedule_timer
(ACE_Event_Handler *,
const void *act,
const ACE_Time_Value &delta,
const ACE_Time_Value &interval);
// Locate and cancel timer.
virtual void cancel_timer (ACE_Event_Handler *);
// = Event-loop methods
// Block process until I/O events occur or timer
// expires, then dispatch activated handler(s).
virtual int handle_events (void);
// Perform a timed event-loop that waits up to TV
// time units for events to occur; if no events
// occur then 0 is returned, otherwise return
// TV - (actual_time_waited).
virtual int handle_events (ACE_Time_Value &tv);
private:
// Pointer to the implementation class,
// e.g.,or
//.
Reactor_Impl *reactor_impl_;
};
图7-2 ACE反应堆接口
该基类指定ACE_Reactor用以控制和协调具体事件处理器的多路分离和分派接口。ACE_Event_Handler接口中的虚方法如图7-3所示。
// Handle portability issues.
#if defined (UNIX)
typedef int ACE_HANDLE;
#elif defined (WIN32)
typedef HANDLE ACE_HANDLE;
#endif /* UNIX */
class ACE_Event_Handler
{
public:
// These values can be bitwise "or’d" together to
// instruct theto check for
// multiple I/O activities on a single handle.
enum {
READ_MASK = 01,
WRITE_MASK = 02,
EXCEPT_MASK = 04,
RWE_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK,
DONT_CALL // Don’t callback to handle_close().
};
// Returns the I/O handle associated with the
// derived object (must be supplied by a subclass).
virtual ACE_HANDLE get_handle (void) const = 0;
// Called when event handler is removed from
// an.
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
// Called when input becomes available.
virtual int handle_input (ACE_HANDLE);
// Called when output is possible.
virtual int handle_output (ACE_HANDLE);
// Called when urgent data is available.
virtual int handle_except (ACE_HANDLE);
// Called when timer expires (stores the
// current time andis the argument given
// when the handler was originally scheduled).
virtual int handle_timeout (const ACE_Time_Value &tv, const void *act = 0);
// Called when signal is triggered by OS.
virtual int handle_signal (int signum);
};
图7-3 ACE事件处理器接口
ACE_Reactor通过派生自ACE_Event_Handler的具体事件处理器来实现它的事件驱动回调机制。具体事件处理器可以有选择地重定义ACE_Event_Handler中的虚方法,以执行应用定义的处理来响应多种类型的事件。这些事件包括(1)同步I/O,例如,读、写和异常;(2)定时器;(3)信号;以及(4)同步,例如,将Win32互斥体从复位切换到置位[4]。
7.3.1.4中描述的ACE_Timer_Queue使用具体事件处理器来处理基于定时器的事件。当此队列管理的定时器到期时,先前排定的(scheduled)事件处理器的handle_timeout方法被调用。有两个参数被传递给该方法:(1)当前时间和(2)void *异步完成令牌(ACT)[9],当事件处理器最初被排定时,此令牌也作为参数被传递给schedule_timer。
当具体事件处理器中的任何方法的返回值<0时,反应堆都自动调用处理器的handle_close清扫方法。应用可对该方法进行编程以执行终止活动,比如关闭日志文件或删除对象分配的动态内存。当handle_close方法返回时,ACE_Reactor从它的内部表中将与之相关联的具体事件处理器移除掉。
具体事件处理器通常要提供一个I/O句柄。当应用调用ACE_Reactor的register_handler方法时,该方法回调具体事件处理器的get_handle方法,以获取底层的I/O句柄。
当应用调用ACE_Reactor的handle_events方法时,所有已登记的具体事件处理器的句柄都被传递给底层的OS事件多路分离调用,例如,select或WaitForMultipleObjects。随后,当I/O事件变为“就绪”时,OS就激活这些句柄。在此时,ACE_Reactor通过调用被定义用来处理该事件的方法,通知适当的具体事件处理器。
该C++包装封装底层OS平台的日期和时间结构,例如,在大多数UNIX平台上定义的struct timeval类型。timeval结构包括两个域,根据秒和微秒来表示时间。其他OS平台,比如POSIX和Win32,使用稍稍不同的时间表示法。为此,ACE_Time_Value类封装这些细节以提供可移植的C++接口。
ACE_Time_Value类的主要方法在图7-4中演示。ACE_Time_Value包装使用操作符重载来简化基于时间的比较。这样的重载允许标准的算术语法被用于涉及时间比较的关系表达式。
// Time value structure from /usr/include/sys/time.h
// struct timeval { long secs; long usecs; };
class ACE_Time_Value
{
public:
ACE_Time_Value (long sec = 0, long usec = 0);
ACE_Time_Value (timeval t);
// Returns sum of twos.
friend ACE_Time_Value operator +
(const ACE_Time_Value &lhs,
const ACE_Time_Value &rhs);
// Returns difference between two
//s.
friend ACE_Time_Value operator -(
const ACE_Time_Value &lhs,
const ACE_Time_Value &rhs);
// Relational and comparison operators for
// normalizeds.
friend int operator <
(const ACE_Time_Value &lhs,
const ACE_Time_Value &rhs);
// Other relation operators...
private:
// ...
};
图7-4 ACE时间值接口
ACE_Time_Value的方法被实现用来“规格化”时间数量。规格化调整timeval结构中的两个域,使之使用能确保精确比较的规范的编码方式。例如,在规格化之后,数量ACE_Time_Value(1, 1000000)将与ACE_Time_Value(2)相等。相反,直接按位比较这些非规格化的类值将检测不到它们的相等。
下面的代码创建两个ACE_Time_Value对象,它们由用户提供的命令行参数加上当前时间来构造。随后显示两个对象之间的正确顺序关系:
int main (int argc, char *argv[])
{
if (argc != 3)
ACE_ERROR_RETURN ((LM_ERROR, "usage: %d" "time1 time2\n"), 1);
ACE_Time_Value time = ACE_OS::gettimeofday ();
ACE_Time_Value timer1 = time + ACE_Time_Value (ACE_OS::atoi (argv[1]));
ACE_Time_Value timer2 = time + ACE_Time_Value (ACE_OS::atoi (argv[2]));
if (timer1 > timer2)
ACE_DEBUG ((LM_DEBUG, "timer 1 is greater\n"));
else if (timer2 > timer1)
ACE_DEBUG ((LM_DEBUG, "timer 2 is greater\n"));
else
ACE_DEBUG ((LM_DEBUG, "timers are equal\n"));
return 0;
}
上面所示的代码对所有OS平台都是完全可移植的。注意像操作符重载和类这样的C++特性的使用是怎样简化与时间相关操作的使用的。
ACE_Reactor的基于定时器的机制对于需要定时器支持的应用来说是很有用的。例如,WWW服务器需要看门狗定时器来释放资源,如果客户在它们连接后不在指定的时间间隔内发送HTTP请求的话。同样地,像Windows NT Service Control Manager[4]这样的看守配置构架要求在它们控制之下的服务周期性地报告它们的当前状态。这些“心跳”消息用于确认服务没有异常地终止。
ACE_Timer_Queue类提供的机制允许应用登记派生自ACE_Event_Handler的、基于定时器的具体事件处理器。ACE_Timer_Queue确保这些事件处理器中的handle_timout方法将来在应用指定的时间被调用。图7-5中所示的ACE_Timer_Queue类的方法使得应用可以调度、取消,和调用定时器对象。
class ACE_Timer_Queue
{
public:
ACE_Timer_Queue (void);
// True if queue is empty, else false.
int is_empty (void) const;
// Returns earliest time in queue.
const ACE_Time_Value &earliest_time (void) const;
// Schedule ato be dispatched at
// theand at subsequent
//s.
int virtual schedule
(ACE_Event_Handler *handler,
const void *act,
const ACE_Time_Value &future_time,
const ACE_Time_Value &interval);
// Cancel all registered
// that match the address of, which
// can be registered multiple times.
int virtual cancel(ACE_Event_Handler *handler);
// Cancel the single
// matching thevalue returned
// from.
int virtual cancel (int timer_id, const void **act = 0);
// Expire all timers <=
// (note, this method must be called manually
// since it is not invoked asynchronously).
void virtual expire(const ACE_Time_Value &expire_time);
private:
// ...
};
图7-5 ACE_Timer_Queue接口
应用调度具体事件处理器,在延迟一定数量的时间之后到期。如果它到期了,act作为值被传递给事件处理器的handle_timeout挂钩方法。如果interval不等于ACE_Time_Value::zero,该值就被用于自动重调度事件处理器。
Schedule方法返回一个定时器id,唯一地标识每个事件处理器在定时器队列的内部表中的登记。定时器id被cancel方法用于在事件处理器到期之前将其移除。如果一个非NULL act被传给cancel,这个act就被设置为应用在定时器最初被排定时所传入的异步完成令牌(ACT)[9]。 这使得程序可以释放动态分配的ACT,以避免内存泄漏。
缺省地,ACE_Reactor所用的ACE_Timer_Queue被作为堆来实现。堆是一种“部分有序的、几乎完全的二进制树”,它确保插入和删除一个具体事件处理器的平均和最坏情况的时间复杂度为O(lg n)。堆表示法对大多数应用实例、特别是实时应用来说都是很适用的。
ACE_Timer_Queue堆由含有ACE_Time_Value、ACE_Event_Handler *,和void *的三元组组成。ACE_Event_Handler * 域指向被排定的定时器对象,该对象的运行时间由ACE_Time_Value域指定。void * 域是在具体事件处理器被最初被排定时所提供的参数。当定时器到期时,这个参数被自动传给7.3.1.2中描述的handle_timeout方法。堆中的每个ACE_Time_Value都以绝对时间单元来存储,例如,按照UNIX gettimeofday系统调用所生成的时间。
在ACE_Timer_Queue接口中使用了虚方法。因而,应用可以扩展缺省的ACE实现来支持其他可选的数据结构,如delta表[11]和定时轮(timing wheel)[12]。delta表将时间存储为“相对的”单元,即相对于表的最前面的ACE_Time_Value的偏移或“delta”。定时轮使用循环缓冲区,使得程序有可能在O(1)时间内启动、停止和维护定时器。ACE构架提供了若干可选的定时器队列的实现。
