ACE技术论文集-第4章 ACE轻量级OS并发机制的OO封装
发布: 2008-6-13 14:52 | 作者: Prashant Jain and D | 来源: 转载 | 查看: 452次
Thread_Control类用于与Thread_Manager类相结合,以使在线程的发起函数中的优雅的线程终止和清扫活动自动化。例如,Thread_Control的构造器存储状态信息。当最初用于调用线程的函数终止时,该信息自动地从相关联的Thread_Manager中移除该线程。该技术能正确地工作,不管(1)函数经由哪一条路径被执行,以及(2)是否有异常被扔出。就这一点来说,Thread_Control类的行为与4.5.2.1介绍的Guard类相类似。
Thread_Control类的接口给出如下:
class Thread_Control
{
public:
// Initialize the thread control object.
// If INSERT != 0, then register the thread
// with the Thread_Manager.
Thread_Control (Thread_Manager *, int add = 0);
// Implicitly kill the thread on exit and
// remove it from its associated Thread_Manager.
?Thread_Control (void);
// Explicitly kill the thread on exit and
// remove it from its associated Thread_Manager.
void *exit (void *status);
// Set the exit status (and return status).
void *set_status (void *status);
// Get the current exit status.
void *get_status (void);
};
Condition类用于在涉及共享数据的条件表达式的状态发生变化时进行阻塞。它封装Solaris线程和POSIX pthreads cond_t同步变量。Condition类接口给出如下:
template
class Condition
{
public:
// Initialize the condition variable.
Condition (const MUTEX &m,
int type = USYNC_THREAD,
void *arg = 0);
// Implicitly destroy the condition variable.
?Condition (void);
// Explicitly destroy the condition variable.
int remove (void);
// Block on condition, or until absolute
// time-of-day has elapsed. If abstime
// == 0 use blocking wait().
int wait (Time_Value *abstime = 0) const;
// Signal one waiting thread.
int signal (void) const;
// Signal *all* waiting threads.
int broadcast (void) const;
private:
cond_t cond_;
// Reference to mutex lock.
const MUTEX &mutex_;
};
注意Win32不提供条件变量抽象。因此,ACE线程库使用其他像信号量和互斥体这样的ACE组件来实现条件变量。
Null_Condition类是上面描述的Condition接口的一种零开销实现。它的方法全都实现为空操作。这对于根本不需要互斥的情况很有用(例如,一个特定的程序或服务将总是在单线程控制中运行,并且/或者不与其他线程争夺对共享资源的访问)。使用Null *类的原因是允许应用参数化它们所需的同步类型,而不需要改动应用代码。Null_Condition类接口给出如下:
template
class Null_Condition
{
public:
Null_Condition (const MUTEX &m,
int type = 0,
void *arg = 0) {}
?Null_Condition (void) {}
int remove (void) { return 0; }
int wait (Time_Value *abstime = 0) const
{
errno = ETIME; return -1;
}
int signal (void) const
{
errno = ETIME; return -1;
}
int broadcast (void) const
{
errno = ETIME; return -1;
}
};
Null_Condition类在“精神”上与4.5.1.5描述的Null_Mutex类是一样的。
Thread_Manager类含有一组机制来对进行协作、以实现集体动作的成组线程进行管理。例如,Thread_Manager类提供的机制(比如suspend_all和resume_all)允许任意数量的参与线程被原子地挂起或恢复。Thread_Manager类还将应用与不同风格的多线程机制(比如Solaris、POSIX和Win32)间的许多不兼容特性屏蔽开来。
Thread_Manager类的接口演示如下:
class Thread_Manager
{
public:
// Initialize the thread manager.
Thread_Manager (int size);
// Implicitly destroy thread manager.
?Thread_Manager (void);
// Initialize the manager with room
// for SIZE threads.
int open (int size = DEFAULT_SIZE);
// Release all resources.
int close (void);
// Create a new thread.
int spawn (THR_FUNC,
long, thread_t * = 0,
void *stack = 0,
size_t stack_size = 0);
// Create N new threads.
int spawn_n (int n, THR_FUNC,
void *args, long flags);
// Clean up when a thread exits.
void *exit (void *status);
// Blocks until there are no
// more threads running.
void wait (void);
// Resume all stopped threads.
int resume_all (void);
// Suspend all threads.
int suspend_all (void);
// Send signum to all stopped threads.
int kill_all (int signal);
private:
// ...
};
Thread_Spawn类提供的一种标准机制管理线程的创建,以并发地处理来自客户的请求。该类作为“线程工厂”,接受来自客户的连接,并“按需”派生线程,以运行由用户提供的服务处理器(SVC_HANDLER)指定的服务。
Thread_Spawn类的接口给出如下:
template
class PEER_ACCEPTOR,
class PEER_ADDR>
class Thread_Spawn
: public Acceptor
PEER_ACCEPTOR,
PEER_ADDR>
{
public:
// = Initialization methods.
Thread_Spawn (Thread_Manager *tm, Reactor *);
virtual int open (const PEER_ADDR &sia, Reactor *);
protected:
virtual int handle_input (int fd);
// Template method that accepts connection
// and spawns a thread.
virtual int handle_close (int fd, Reactor_Mask);
// Called when this factory is closed down.
virtual SVC_HANDLER *make_svc_handler (void);
// Factory method that creates an appropriate
// SVC_HANDLER *.
virtual int thr_flags (void);
// Returns the flags used to spawn a thread.
};
注意此类是怎样从ACE Acceptor类继承的,后者是一种通用工厂,用于被动地连接客户和创建服务处理器[30]。
Task类是ACE中用于创建用户定义的主动对象[16]和被动对象(它们处理应用消息)的中心机制。ACE Task可进行以下活动:
- 可被动态链接;
- 可用作I/O操作的端点;
- 可与多个线程控制相关联(也就是,成为所谓的“主动对象”);
- 可在队列中存储消息,用于后续处理;
- 可执行用户定义的服务。
Task抽象类定义的接口被派生类继承和实现,以提供应用特有的功能。它之所以是一个抽象类,是因为它的接口定义了一些在下面描述的纯虚方法(open、close、put和svc)。通过使Stream类属提供的与应用无关的组件与继承并使用这些组件的应用特有的子类去耦合,将任务定义为抽象类增强了复用。同样地,纯虚函数的使用允许C++编译器确保任务的子类遵从它提供以下功能的义务:
- 初始化和终止方法:派生自Task的子类必须实现open和close方法,它们执行应用特有的Task初始化和终止活动。这些活动通常分配和释放资源,比如连接控制块、I/O句柄和同步锁。
Task可与Module(模块)一起或是分开定义和使用。当与模块一起使用时,任务被成对地存储:一个Task子类处理读端的自下游发到该模块层的消息,另一个处理写端的自上游发到该模块层的消息。
当一个模块被插入流,或从流中被移除时,它的读端和写端的任务子类的open和close方法分别自动地被ASX构架调用。
- 应用特有的处理方法:除了open和close方法,Task的子类还必须定义put和svc方法。这些方法在消息上执行应用特有的处理功能。例如,当消息到达流的头或尾时,作为调用流中的每个任务的put和/或svc方法的结果,它们被“护送”通过一系列互连的任务。
put方法在流中某层的一个Task传递消息给另一层中相邻的Task时被调用。put方法相对于它的调用者同步地运行,也就是,它从起先调用它的put方法的Task那里借用线程控制。该线程控制通常从应用进程“自下而上”、从处理I/O设备中断[31]的线程池“自上而下”发起,或是由事件分派机制(比如在面向连接的传输协议模块中、用于触发重发的定时器驱动的呼出队列)在流内部发起。
如果一个ACE Task作为被动对象执行(也就是,它总是从调用者那里借用线程控制),Task::put方法就是进入该Task的入口,并用作Task在其中执行它的工作的上下文。相反,如果一个ACE Task作为主动对象执行,Task::svc方法就被用于相对于其他Task异步地执行应用特有的处理。不像put,svc方法并不直接从相邻的Task那里调用。相反,它被与它从属的Task相关联的一个分离的线程调用。该线程为Task的svc方法提供执行上下文和线程控制。svc方法运行一个事件循环,持续地等待消息到达Task的Message Queue(消息队列,见下一条目)。
在put或svc方法的实现中,消息可以通过任务的put_next实用方法传递给流中相邻的Task。put_next调用驻留在相邻层中的下一个Task的put方法。这个对put的调用可以从调用者那里借用线程控制,并立即处理消息(也就是,图4-4(1)演示的同步处理方法)。相反地,put方法可以将消息入队,并将处理推迟给它的在分离的线程控制中执行的svc方法(也就是,图4-4(2)演示的异步处理方法)。如在[1]中所讨论的,选择特定的处理方法对性能和编程的容易程度有着显著的影响。

图4-4 调用put和svc方法的可选方案
- 消息排队机制:除了open、close、put和svc纯虚方法接口,每个任务还含有一个Message Queue(消息队列)。Message Queue是ACE中的标准组件,用于在Task间传递消息。而且,当Task作为主动对象执行时,它的Message Queue用于缓存一系列数据消息和控制消息,以在svc方法中作后续处理。在消息到达时,svc方法使消息出队,并执行Task子类的应用特有的处理工作。
Message Queue中可以出现两种类型的消息:简单的和复合的。简单消息含有单个Message Block,而复合消息含有多个链接在一起的Message Block。复合消息通常由一个控制块和紧随的一或多个数据块组成。控制块含有“簿记”信息(比如目的地址和长度域),而数据块含有消息的实际内容。通过传递消息指针而不是拷贝数据,在Task间传递Message Block的开销被降到了最低。
Message Queue含有一对高低水位标变量,用于在流中的相邻Module间实现层到层的流控制。高水位标指示消息队列在进行流控制之前所愿意缓存的消息字节数。低水位标指示在先前已进行流控制的Message Queue不再被视为满的“水位”。
Task类的接口提供如下:
template
class Task : public Service_Object
{
public:
// Initialization/termination methods.
Task (Thread_Manager *thr_mgr = 0,
Message_Queue *mp = 0);
virtual int open (void *flags = 0) = 0;
virtual int close (u_long = 0) = 0;
// Transfer msg into the queue to handle
// immediate processing.
virtual int put (Message_Block *, Time_Value *tv = 0) = 0;
// Run by a daemon thread to handle
// deferred processing.
virtual int svc (void) = 0;
protected:
// Turn the task into an active object..
int activate (long flags);
// Routine that runs the service routine
// as a daemon thread.
static void *svc_run (Task *);
// Tests whether a message can be enqueue
// without blocking.
int can_put (Message_Block *);
// Insert message into the message list.
int putq (Message_Block *, Time_Value * = 0);
// Extract the first message from the list.
int getq (Message_Block *&, Time_Value * = 0);
// Return a message to the queue.
int ungetq (Message_Block *, Time_Value * = 0);
// Transfer message to the adjacent Task
// in a Stream.
int put_next (Message_Block *, Time_Value * = 0);
// Turn the message back around.
int reply (Message_Block *, Time_Value * = 0);
// Task utility routines to identify names.
const char *name (void) const;
Task *sibling (void);
Module *module (void) const;
// Check if queue is a reader.
int is_reader (void);
// Check if queue is a writer.
int is_writer (void);
// Special routines corresponding to
// certain message types.
int flush (u_long flag);
// Manipulate watermarks.
void water_marks (IO_Cntl_Msg::IO_Cntl_Cmds, size_t);
};
最新更新
- ACE_Reactor中使用register_hand... 6-13
- ACE技术论文集-第1章 ACE自适配... 6-13
- ACE技术论文集-第2章 包装外观 W... 6-13
- ACE技术论文集-第3章 高效可移植... 6-13
- ACE技术论文集-第4章 ACE轻量级O... 6-13
- ACE技术论文集-第5章 C/C++线程... 6-13
- ACE技术论文集-第6章 主动对象(Ac... 6-13
- ACE技术论文集-第7章 ACE反应堆R... 6-13
- ACE技术论文集-第8章 异步事件多... 6-13
- ACE技术论文集-第9章连接和初始... 6-13
