ACE技术论文集-第4章 ACE轻量级OS并发机制的OO封装
发布: 2008-6-13 14:52 | 作者: Prashant Jain and D | 来源: 转载 | 查看: 455次
4.5 公共接口和内部设计
这一部分描述ACE OO线程封装库中组件的公共接口和相关的内部设计情况。ACE组件被划分为下面的组:
- 低级C++线程API:为底层的OS线程和同步API提供低级C++封装。低级C++线程API包含在被称为OS的类中。该类封装了各种版本的UNIX以及WIN32之间的所有差异。ACE中的其他组件被编写为只使用该类中的方法,从而使得将ACE移植到新的平台上变得更为容易。
- 高级C++线程库:允许多线程应用被编写为使用更高级的C++特性,像构造器/析构器和模板。高级C++线程API根据低级的ACE OS类来编写。它们被划分为三个组:
-锁定机制(在4.5.1描述)的C++包装。
-本地线程函数(在4.5.6.1描述)的C++包装。
-更高级的线程管理类(在4.5.4.1描述)。
ACE OO线程组件的一个使用实例已在4.4中介绍。这一部分的余下部分对ACE的公共接口、功能和内部设计进行更为全面的讨论。在适当的地方,这一部分描述该C++包装类的私有部分,以演示包装是如何映射到Solaris 2.x线程和同步机制上的。POSIX pthreads和Win32包装的实现是类似的。
ACE C++包装给4.3.5描述的Solaris、POSIX和Win32 OS同步机制提供了一种可移植、线程安全和面向对象的接口。下面的条目概述了这些ACE C++封装的主要好处:
- 提高正确性:通过使作为C++类和结构中的域出现的同步对象的初始化自动化,以及通过保证锁被自动获取和释放来实现。
- 统一的同步接口:所有线程和同步的C++包装都为获取和释放多种类型的锁提供了统一的接口。特别地,ACE锁类属中的所有组件都支持四种通用方法:acquire、try_acquire、release和remove。这种统一性使得开发者有可能使用锁类作为类型参数,来与其他的ACE同步组件(比如在4.5.2.1、4.5.6.2和4.5.1.4定义的那些组件)联合使用。
- 更为直观的错误报告:Solaris 2.x和POSIX pthreads同步函数使用一种不那么标准的机制来将错误返回给调用者。相反,ACE包装使用一种更为标准的方法:如果发生失败就返回-1,并设置errno来指示失败的原因。
- 简化常见使用模式:包装简化了低级线程和同步机制的常见使用模式。所示代码通过使用mutex_t和cond_t的ACE C++包装实现一个简单版本的Dijkstra计数信号量(也就是,P和V分别等价于acquire和release)演示了这一点。
class Semaphore
{
public:
Semaphore (int initial_value)
: count_nonzero_ (lock_)
{
// Automatically acquire lock.
Guard monitor (lock_);
count_ = initial_value;
// Automatically release the lock
}
// Block the thread until the semaphore
// count becomes greater than 0,
// then decrement it.
void acquire (void)
{
// Automatically acquire lock
Guard monitor (lock_);
// Wait until semaphore is available.
while (count_ == 0)
count_nonzero_.wait ();
count_ = count_ - 1;
// Automatically release the lock
}
// Increment the semaphore, potentially
// unblocking a waiting thread.
void release (void)
{
// Automatically acquire lock
Guard monitor (lock_);
// Allow waiter to continue.
if (count_ == 0)
count_nonzero_.signal ();
count_ = count_ + 1;
// Automatically release the lock
}
private:
Thread_Mutex lock_;
Condition count_nonzero_;
u_int count_;
};
注意Condition对象count_nonzero_的构造器是怎样将Thread_Mutex对象lock_和Condition对象绑定在一起的。这简化了Condition::wait调用接口。相反,本地的Solaris和pthreads cond_t cond_wait接口要求每次调用wait时都传递一个互斥体作为参数。
Solaris 2.x和Win32提供一种内建的计数信号量实现(见4.3.5.3的讨论)。但是,POSIX pthreads[3]线程库没有包含信号量。因此,上面所示的类既演示了ACE C++包装的使用,又为ACE线程封装库中POSIX pthreads的可移植Semaphore实现提供了文档。
ACE互斥体包装提供了一种简单而高效的机制来序列化对共享资源的访问。它们封装Solaris和POSIX pthreads mutex_t同步变量,以及Win32的基于HANDLE的互斥体实现。Mutex的类定义如下所示:
class Mutex
{
public:
// Initialize the mutex.
Mutex (int type = USYNC_THREAD);
// Implicitly destroy the mutex.
?Mutex (void);
// Explicitly destroy the mutex.
int remove (void);
// Acquire lock ownership (wait
// for lock to be released).
int acquire (void) const;
// Conditionally acquire lock
// (i.e., don’t wait for lock
// to be released).
int try_acquire (void) const;
// Release lock and unblock
// the next waiting thread.
int release (void) const;
private:
mutex_t lock_;
// Type of synchronization lock.
};
在ACE中,线程可以通过调用Mutex对象的acquire方法来进入临界区。任何对该方法的调用都将会阻塞,直到当前拥有该锁的线程离开它的临界区。要离开临界区,线程调用它当前拥有的Mutex对象的release方法。调用release使得另一个阻塞在该互斥体上的线程能够进入它的临界区。
Thread_Mutex和Process_Mutex类继承自Mutex,并使用它的构造器来创建适当类型的互斥体,如下:
class Thread_Mutex : public Mutex
{
public:
Thread_Mutex (void): Mutex (USYNC_THREAD);
};
class Process_Mutex : public Mutex
{
public:
Thread_Mutex (void): Mutex (USYNC_PROCESS);
};
这些调用被映射到适当的底层API上,以分别创建线程和进程专用的互斥体。特别地,Thread_Mutex的Win32实现使用更为高效、但却不那么强大的CRITICAL_SECTION实现,而Process_Mutex实现则使用较为低效、但却更为强大的Win32互斥体HANDLE。
ACE信号量包装类实现Dijkstra的“计数信号量”抽象,这是一种用于序列化多个线程控制的通用机制。它们封装Solaris sema_t同步变量。Semaphore类接口如下所示:
class Semaphore
{
public:
// Initialize the semaphore,
// with default value of "count".
Semaphore (u_int count,
int type = USYNC_THREAD,
void * = 0);
// Implicitly destroy the semaphore.
?Semaphore (void);
// Explicitly destroy the semaphore.
int remove (void);
// Block the thread until the semaphore count
// becomes greater than 0, then decrement it.
int acquire (void) const;
// Conditionally decrement the semaphore if
// count greater than 0 (i.e., won’t block).
int try_acquire (void) const;
// Increment the semaphore, potentially
// unblocking a waiting thread.
int release (void) const;
private:
sema_t semaphore_;
};
Thread_Semaphore和Process_Semaphore类继承自Semaphore,并使用它的构造器来创建适当类型的信号量,如下所示:
class Thread_Semaphore : public Semaphore
{
public:
Thread_Semaphore (void): Semaphore (USYNC_THREAD);
};
class Process_Semaphore : public Semaphore
{
public:
Thread_Semaphore (void): Semaphore (USYNC_PROCESS);
};
ACE读者/作者包装序列化对这样一种资源的访问:其内容被搜索要多于被变动。它们封装rwlock_t同步变量,这种变量在Solaris上在本地实现,而在Win32和Pthreads上则由ACE模拟。RW_Mutex接口如下所示:
class RW_Mutex
{
public:
// Initialize a readers/writer lock.
RW_Mutex (int type = USYNC_THREAD,
void *arg = 0);
// Implicitly destroy a readers/writer lock.
?RW_Mutex (void);
// Explicitly destroy a readers/writer lock.
int remove (void);
// Acquire a read lock, but
// block if a writer hold the lock.
int acquire_read (void) const;
// Acquire a write lock, but
// block if any readers or a
// writer hold the lock.
int acquire_write (void) const;
// Conditionally acquire a read lock
// (i.e., won’t block).
int try_acquire_read (void) const;
// Conditionally acquire a write lock
// (i.e., won’t block).
int try_acquire_write (void) const;
// Unlock a readers/writer lock.
int release (void) const;
private:
rwlock_t lock_;
};
注意POSIX Pthreads和Win32线程并不提供rwlock_t类型。为确保代码的可移植性,ACE提供了一种基于现有低级同步机制(比如互斥体和条件变量)的RW_Mutex实现。另外,ACE还提供RW_Thread_Mutex和RW_Process_Mutex实现。
Recursive_Thread_Mutex扩展缺省的Solaris非递归锁定语义。它允许嵌套调用acquire方法,只要拥有该锁的线程也是重获取它的线程。它与Thread_Mutex类一起工作。
缺省地,Solaris提供非递归互斥体。这些语义在某些环境中太过受限。因此,ACE通过Recursive_Thread_Mutex类为递归锁提供支持。递归锁对于回调驱动的C++构架[28, 20]特别有用,在其中构架的事件循环执行对用户定义的代码的回调。因为用户定义的代码有可能随后经由一个方法入口重入构架代码,递归锁对于防止在回调过程中,在构架所持有的锁上发生死锁十分有用。
下面的C++类为Solaris 2.x同步机制实现递归锁语义(注意POSIX Pthreads和Win32在它们的本地线程库中提供递归锁):
class Recursive_Thread_Mutex
{
public:
// Initialize a recursive mutex.
Recursive_Thread_Mutex (const char *name = 0
void *arg = 0);
// Implicitly release a recursive mutex.
?Recursive_Thread_Mutex (void);
// Explicitly release a recursive mutex.
int remove (void);
// Acquire a recursive mutex (will increment
// the nesting level and not deadmutex if
// owner of the mutex calls this method more
// than once).
int acquire (void) const;
// Conditionally acquire a recursive mutex
// (i.e., won’t block).
int try_acquire (void) const;
// Releases a recursive mutex (will not
// release mutex until nesting level == 0).
int release (void) const;
thread_t get_thread_id (void);
// Return the id of the thread that currently
// owns the mutex.
int get_nesting_level (void);
// Return the nesting level of the recursion.
// When a thread has acquired the mutex for the
// first time, the nesting level == 1. The nesting
// level is incremented every time the thread
// acquires the mutex recursively.
private:
void set_nesting_level (int d);
void set_thread_id (thread_t t);
Thread_Mutex nesting_mutex_;
// Guards the state of the nesting level
// and thread id.
Condition lock_available_;
// This is the condition variable that actually
// suspends other waiting threads until the
// mutex is available.
int nesting_level_;
// Current nesting level of the recursion.
thread_t owner_id_;
// Current owner of the lock.
};
下面的代码演示Recursive_Thread_Mutex类中的方法的实现:
Recursive_Thread_Mutex::Recursive_Thread_Mutex
(const char *name, void *arg)
: nesting_level_ (0),
owner_id_ (0),
nesting_mutex (name, arg),
lock_available_ (nesting_mutex_, name, arg)
{
}
// Acquire a recursive lock (will increment
// the nesting level and not deadlock if
// owner of lock calls method more than once).
int Recursive_Thread_Mutex::acquire (void) const
{
thread_t t_id = Thread::self ();
Thread_Mutex_Guard mon (nesting_mutex_);
// If there’s no contention, just
// grab the lock immediately.
if (nesting_level_ == 0)
{
set_thread_id (t_id);
nesting_level_ = 1;
}
// If we already own the lock,
// then increment the nesting level
// and proceed.
else if (t_id == owner_id_)
nesting_level_++;
else
{
// Wait until the nesting level has dropped to
// zero, at which point we can acquire the lock.
while (nesting_level_ > 0)
lock_available_.wait ();
set_thread_id (t_id);
nesting_level_ = 1;
}
return 0;
}
// Releases a recursive lock.
int Recursive_Thread_Mutex::release (void) const
{
thread_t t_id = Thread::self ();
// Automatically acquire mutex.
Thread_Mutex_Guard mon (nesting_mutex_);
nesting_level_--;
if (nesting_level_ == 0)
// Inform waiters that the lock is free.
lock_available_.signal ();
return 0;
}
下面是基于4.4介绍的Atomic_Op COUNTER的变种的一个Recursive_Thread_Mutex的例子。在例中,Atomic_Op在单线程中被多次递归的函数调用:
// Counter is a recursive lock.
typedef Atomic_Op
COUNTER;
// Keep track of the recursion depth.
static COUNTER recursion_depth;
int factorial (int n)
{
if (n <= 1)
{
cout << "recursion depth = "
<< recursion_depth << endl;
return n;
}
else
{
// First call acquires lock, subsequent
// calls increment nesting level.
recursion_depth++;
return factorial (n - 1) * n;
}
}
当recursion_depth计数器增长时,Recursive_Thread_Mutex的使用防止了死锁的发生。尽管这演示了递归锁的行为,它并非是一个非常令人信服的例子。在多线程中执行factorial的程序可能会产生不可预知的结果,因为recursion_depth是一个全局变量,可能会被多个线程控制连续地修改!在这种情况下,一种更为适当的(并且更低廉的)锁定策略将使用4.5.6.4描述的线程专有存储模式[29]。
Null_Mutex类提供一种零开销的通用锁定接口的实现,该接口与其他用于线程和同步的C++包装共享。Null_Mutex的接口和极其简单的实现如下所示:
class Null_Mutex
{
public:
Null_Mutex (void) {}
?Null_Mutex (void) {}
int remove (void) { return 0; }
int acquire (void) const { return 0; }
int try_acquire (void) const { return 0; }
int release (void) const { return 0; }
};
如上面的代码所示,Null_Mutex类将acquire和release方法实现为“空操作”内联函数,编译优化器将把它们完全清除掉。4.6演示Null_Mutex的使用。
该类提供了一种比Mutex更为通用的同步机制。例如,它实现了“递归互斥体”语义,拥有该令牌的线程可以重新获取它,而不会死锁。此外,当其他线程释放Token时,阻塞并等待该Token的线程严格地按照FIFO(先进先出)的顺序被服务。相反,Mutex并不严格地实施一种获取顺序。
Token类的接口如下所示:
class Token
{
public:
// Initialization and termination.
Token (const char *name = 0, void * = 0);
?Token (void);
// Acquire the token, sleeping until it is
// obtained or until expires.
// If some other thread currently holds
// the token then is called
// before our thread goes to sleep.
int acquire (void (*sleep_hook)(void *),
void *arg = 0,
Time_Value *timeout = 0);
// This behaves just like the previous
// method, except that it
// invokes the virtual function called
// that can be overridden
// by a subclass of Token.
int acquire (Time_Value *timeout = 0);
// This should be overridden by a subclass
// to define the appropriate behavior before
// goes to sleep. By default,
// this is a no-op...
virtual void sleep_hook (void);
// An optimized method that efficiently
// reacquires the token if no other threads
// are waiting. This is useful for if you
// don’t want to degrad the quality of
// service if there are other threads
// waiting to get the token.
int renew (int requeue_position = 0,
Time_Value *timeout = 0);
// Become interface-compliant with other
// lock mechanisms (implements a
// non-blocking ).
int tryacquire (void);
// Shuts down the Token instance.
int remove (void);
// Relinquish the token. If there are any
// waiters then the next one in line gets it.
int release (void);
// Return the number of threads that are
// currently waiting to get the token.
int waiters (void);
// Return the id of the current thread that
// owns the token.
thread_t current_owner (void);
};
与C一级的互斥体API相比较,4.5.1.1描述的Mutex包装为同步多线程控制提供了一种优雅的接口。但是,Mutex潜在地容易出错,因为程序员有可能忘记调用release方法(如4.3.7所示)。这可能由于程序员的疏忽或是C++异常的发生而发生。
因此,为改善应用的健壮性,ACE同步机制有效地利用C++类构造器和析构器的语义来确保Mutex锁被自动获取和释放。ACE提供了一个称为Guard、Write_Guard和Read_Guard的类族,确保在进入和退出C++代码块时分别自动获取和释放锁。
Guard类是最基本的守卫机制,定义如下:
template
class Guard
{
public:
// Implicitly and automatically acquire (or try
// to acquire) the lock.
Guard (LOCK &l, int block = 1): lock_ (&l)
{
result_ = block ? acquire () : tryacquire ();
}
// Implicitly release the lock.
?Guard (void)
{
if (result_ != -1)
lock_.release ();
}
// 1 if locked, 0 if can’t acquire lock
// (errno will contain the reason for this).
int locked (void)
{
return result_ != -1;
}
// Explicitly release the lock.
int remove (void)
{
return lock_->remove ();
}
// Explicitly acquire the lock.
int acquire (void)
{
return lock_->acquire ();
}
// Conditionally acquire the lock (i.e., won’t block).
int tryacquire (void)
{
return lock_->tryacquire ();
}
// Explicitly release the lock.
int release (void)
{
return lock_->release ();
}
private:
// Pointer to the LOCK we’re guarding.
LOCK *lock_;
// Tracks if acquired the lock or failed.
int result_;
};
Guard类的对象定义一“块”代码,在其上锁被自动获取,并在退出块时自动释放。注意这种机制也能为Mutex、RW_Mutex和Semaphore同步封装工作。这演示了使用C++包装的另一个好处:通过改编不必要地互不兼容的接口(比如Solaris 2.x信号量和互斥体),这些封装促进了接口的一致性。
缺省地,上面所示的Guard类构造器将会阻塞,直到锁被获取。会有这样的情况,程序必须使用非阻塞的acquire调用(例如,防止死锁)。因此,可以传给ACE Guard的构造器第二个参数,指示它使用锁的try_acquire方法,而不是acquire。随后调用者可以使用Guard的locked方法来原子地测试实际上锁是否已被获取。
Read_Guard和Write_Guard类有着与Guard类相同的接口。但是,它们的acquire方法分别对锁进行读和写。
