ACE技术论文集-第8章 异步事件多路分离和分派处理器
发布: 2008-6-13 14:40 | 作者: Prashant Jain and D | 来源: 转载 | 查看: 430次
Asynchronous Operation Processor必须含有异步执行操作的机制。换句话说,当应用线程调用Asynchronous Operation时,必须不借用应用的线程控制而执行此操作。幸好,现代操作系统提供了用于Asynchronous Operation的机制(例如,POSIX 异步I/O和WinNT重叠式I/O)。在这样的情况下,实现模式的这一部分只需要简单地将平台API映射到上面描述的Asynchronous Operation API。
如果OS平台不提供对Asynchronous Operation的支持,有若干实现技术可用于构建Asynchronous Operation Engine。或许最为直观的解决方案是使用专用线程来为应用执行Asynchronous Operation。要实现线程化的Asynchronous Operation Engine,有三个主要步骤:
- 操作调用:因为操作将在与进行调用的应用线程不同的线程控制中执行,必定会发生某种类型的线程同步。一种方法是为每个操作派生一个线程。更为常用的方法是为Asynchronous Operation Processor而管理一个专用线程池。该方法可能需要应用线程在继续进行其他应用计算之前将操作请求排队。
- 操作执行:既然操作将在专用线程中执行,所以它可以执行“阻塞”操作,而不会直接阻碍应用的进展。例如,在提供异步I/O读取机制时,专用线程可以在从socket或文件句柄中读时阻塞。
- 操作完成:当操作完成时,应用必须被通知到。特别是,专用线程必须将应用特有的通知委托给Completion Dispatcher。这要求在线程间进行另外的同步。
当Completion Dispatcher从Asynchronous Operation Processor接收到操作完成通知时,它会回调与应用对象相关联的Completion Handler。实现Completion Dispatcher涉及两个问题:(1)实现回调以及(2)定义用于执行回调的并发策略。
Completion Dispatcher必须实现一种机制,Completion Handler通过它被调用。这要求Proactive Initiator在发起操作时指定一个回调。下面是常用的回调可选方案:
回调类:Completion Handler输出接口、让Completion Dispatcher知道。当操作完成时,Completion Dispatcher回调此接口中的方法,并将已完成操作的有关信息传递给它(比如从网络连接中读取的字节数)。
函数指针:Completion Dispatcher通过回调函数指针来调用Completion Handler。该方法有效地打破了Completion Dispatcher和Completion Handler之间的知识依赖。这有两个好处:
- Completion Handler不会被迫输出特定的接口;以及
- 在Completion Dispatcher和Completion Handler之间不需要有编译时依赖。
会合点:Proactive Initiator可以设立事件对象或条件变量,用作Completion Dispatcher和Completion Handler之间的会合点。这在Completion Handler是Proactive Initiator时最为常见。在Asynchronous Operation运行至完成的同时,Completion Handler处理其他的活动。Completion Handler将在会合点周期性地检查完成状态。
当操作完成时,Asynchronous Operation Processor将会通知Completion Dispatcher。在这时,Completion Dispatcher可以利用下面的并发策略中的一种来执行应用回调:
动态线程分派:Completion Dispatcher可为每个Completion Handler动态分配一个线程。动态线程分派可通过大多数多线程操作系统来实现。在有些平台上,由于创建和销毁线程资源的开销,这可能是所列出的Completion Dispatcher实现技术中最为低效的一种,
后反应式分派(Post-reactive dispatching):Completion Dispatcher可以发信号给Proactive Initiation所设立的事件对象或条件变量。尽管轮询和派生阻塞在事件对象上的子线程都是可选的方案,最为高效的后反应式分派方法是将事件登记到Reactor。后反应式分派可以通过POSIX实时环境中的aio_suspend和Win32环境中的WaitForMultipleObjects来实现。
Call-through分派:来自Asynchronous Operation Processor的线程控制可被Completion Dispatcher借用,以执行Completion Handler。这种“周期偷取”策略可以通过减少空闲线程的影响范围来提高性能。在一些老操作系统会将上下文切换到空闲线程、又只是从它们切换出去的情况下,这种方法有着收回“失去的”时间的巨大潜力。
Call-through分派在Windows NT中可以使用ReadFileEx和WriteFileEx Win32函数来实现。例如,线程控制可以使用这些调用来等待信号量被置位。当它等待时,线程通知OS它进入了一种称为“可报警等待状态”(alterable wait state)的特殊状态。在这时,OS可以占有对等待中的线程控制的栈和相关资源的控制,以执行Completion Handler。
线程池分派:由Completion Dispatcher拥有的线程池可被用于Completion Handler的执行。在池中的每个线程控制已被动态地分配到可用的CPU。线程池分派可通过Windows NT的I/O完成端口来实现。
在考虑上面描述的Completion Dispatcher技术的适用性时,考虑表8-1中所示的OS环境和物理硬件的可能组合:
|
线程模型 |
系统类型 | |
|
单处理器 |
多处理器 | |
|
单线程 |
A |
B |
|
多线程 |
C |
D |
表8-1 Completion Dispatcher并发策略
如果你的OS只支持同步I/O,那就参见反应堆模式[5]。但是,大多数现代操作系统都支持某种类型的异步I/O。
在表8-1的A和B组合中,假定你不等待任何信号量或互斥体,后反应方式的异步I/O很可能是最好的。否则,Call-through实现或许更能回应你的问题。在C组合中,使用Call-through方法。在D组合中,使用线程池方法。在实践中,系统化的经验测量对于选择最为合适的可选方案来说是必需的。
Completion Handler的实现带来以下考虑。
Completion Handler可能需要维护关于特定请求的状态信息。例如,OS可以通知Web服务器,只有一部分文件已被写到网络通信端口。作为结果,Completion Handler可能需要重新发出请求,直到文件被完全写出,或连接变得无效。因此,它必须知道原先指定的文件,还剩多少字节要写,以及在前一个请求开始时文件指针的位置。
没有隐含的限制来阻止Proactive Initiator将多个Asynchronous Operation请求分配给单个Completion Handler。因此,Completion Handler必须在完成通知链中一一“系上”请求特有的状态信息。为完成此工作,Completion Handler可以利用异步完成令牌(Asynchronous Completion Token)模式[8]。
与在任何多线程环境中一样,使用前摄器模式的Completion Handler还是要由它自己来确保对共享资源的访问是线程安全的。但是,Completion Handler不能跨越多个完成通知持有共享资源。否则,就有发生“用餐哲学家问题”的危险[11]。
该问题在于一个合理的线程控制永久等待一个信号量被置位时所产生的死锁。通过设想一个由一群哲学家出席的宴会可以演示这一问题。用餐者围绕一个圆桌就座,在每个哲学家之间只有一支筷子。当哲学家觉得饥饿时,他必须获取在他左边和在他右边的筷子才能用餐。一旦哲学家获得一支筷子,不到吃饱他们就不会放下它。如果所有哲学家都拿起在他们右边的筷子,就会发生死锁,因为他们将永远也不可能拿到左边的筷子。
Completion Dispatcher类型决定在执行时一个Completion Handler是否可占先。当与动态线程和线程池分派器相连时,Completion Handler自然可占先。但是,当与后反应式Completion Dispatcher相连时,Completion Handler并没有对其他Completion Handler的占先权。当由Call-through分派器驱动时,Completion Handler相对于在可报警等待状态的线程控制也没有占先权。
一般而言,处理器不应该执行持续时间长的同步操作,除非使用了多个完成线程,因为应用的总体响应性将会被显著地降低。这样的危险可以通过增强的编程训练来降低。例如,所有Completion Handler被要求用作Proactive Initiator,而不是去执行同步操作。
这一部分显示怎样使用前摄器模式来开发Web服务器。该例子基于ACE构架[4]中的前摄器实现。
当客户连接到Web服务器时,HTTP_Handler的open方法被调用。于是服务器就通过在Asynchronous Operation完成时回调的对象(在此例中是this指针)、用于传输数据的网络连接,以及一旦操作完成时使用的Completion Dispatcher(proactor_)来初始化异步I/O对象。随后读操作异步地启动,而服务器返回事件循环。
当Async read操作完成时,分派器回调HTTP_Handler::handle_read_stream。如果有足够的数据,客户请求就被解析。如果整个客户请求还未完全到达,另一个读操作就会被异步地发起。
在对GET请求的响应中,服务器对所请求文件进行内存映射,并将文件数据异步地写往客户。当写操作完成时,分派器回调HTTP_Handler::handle_write_stream,从而释放动态分配的资源。
附录中含有两个其他的代码实例,使用同步的线程模型和同步的(非阻塞)反应式模型实现Web服务器。
class HTTP_Handler
: public Proactor::Event_Handler
// = TITLE
// Implements the HTTP protocol
// (asynchronous version).
//
// = PATTERN PARTICIPANTS
// Proactive Initiator = HTTP_Handler
// Asynch Op = Network I/O
// Asynch Op Processor = OS
// Completion Dispatcher = Proactor
// Completion Handler = HTPP_Handler
{
public:
void open (Socket_Stream *client)
{
// Initialize state for request
request_.state_ = INCOMPLETE;
// Store reference to client.
client_ = client;
// Initialize asynch read stream
stream_.open (*this, client_->handle (), proactor_);
// Start read asynchronously.
stream_.read (request_.buffer (),
request_.buffer_size ());
}
// This is called by the Proactor
// when the asynch read completes
void handle_read_stream(u_long bytes_transferred)
{
if (request_.enough_data(bytes_transferred))
parse_request ();
else
// Start reading asynchronously.
stream_.read (request_.buffer (),
request_.buffer_size ());
}
void parse_request (void)
{
// Switch on the HTTP command type.
switch (request_.command ())
{
// Client is requesting a file.
case HTTP_Request::GET:
// Memory map the requested file.
file_.map (request_.filename ());
// Start writing asynchronously.
stream_.write (file_.buffer (), file_.buffer_size ());
break;
// Client is storing a file
// at the server.
case HTTP_Request::PUT:
// ...
}
}
void handle_write_stream(u_long bytes_transferred)
{
if (file_.enough_data(bytes_transferred))
// Success....
else
// Start another asynchronous write
stream_.write (file_.buffer (), file_.buffer_size ());
}
private:
// Set at initialization.
Proactor *proactor_;
// Memory-mapped file_;
Mem_Map file_;
// Socket endpoint.
Socket_Stream *client_;
// HTTP Request holder
HTTP_Request request_;
// Used for Asynch I/O
Asynch_Stream stream_;
};
