4.2.4 杂项同步类
除了上面描述的同步类,
ACE还包括其他一些同步类,比如ACE_Barrier和ACE_Atomic_Op。中的栅栏(Barrier)
栅栏有一个好名字,因为它恰切地描述了栅栏应做的事情。一组线程可以使用栅栏来进行共同的相互同步。组中的每个线程各自执行,直到到达栅栏,就阻塞在那里。在所有相关线程到达栅栏后,它们就全部继续它们的执行。就是说,它们一个接一个地阻塞,等待其他的线程到达栅栏;一旦所有线程都到达了它们的执行路径中的“栅栏点”,它们就一起重新启动。
在
ACE中,栅栏在ACE_Barrier类中实现。在栅栏对象被实例化时,它将要等待的线程的数目会作为参数传入。一旦到达执行路径中的“栅栏点”,每个线程都在栅栏对象上发出wait()调用。它们在这里阻塞,直到其他线程到达它们各自的“栅栏点”,然后再一起继续执行。当栅栏从相关线程那里接收了适当数目的wait()调用时,它就同时唤醒所有阻塞的线程。下面的例子演示怎样通过
ACE使用栅栏:
例
4-7#include "ace/Thread.h"
#include "ace/Synch.h"
static int number=0;
static int seed=0;
class Args
{
public:
Args(ACE_Barrier *barrier): barrier_(barrier){}
ACE_Barrier *barrier_;
};
static void*
worker(void *arguments)
{
Args *arg= (Args*)arguments;
ACE_DEBUG((LM_DEBUG,"Thread (%t) Created to do some work\n"));
::number++;
//Work
ACE_OS::sleep(ACE_OS::rand()%2);
//Exiting now
ACE_DEBUG((LM_DEBUG,
"\tThread (%t) Done! \n\tThe number is now: %d\n",number));
//Let the barrier know we are done.
arg->barrier_->wait();
ACE_DEBUG((LM_DEBUG,"Thread (%t) is exiting \n"));
return 0;
}
int main(int argc, char *argv[])
{
if(argc<2)
{
ACE_DEBUG((LM_DEBUG,"Usage: %s
\n", argv[0])); ACE_OS::exit(1);
}
int n_threads=ACE_OS::atoi(argv[1]);
ACE_DEBUG((LM_DEBUG,"Preparing to spawn %d threads",n_threads));
//Setup the random number generator
ACE_OS::srand(::seed);
//Setup arguments for threads
ACE_Barrier barrier(n_threads);
Args arg(&barrier);
//Spawn off n_threads number of threads
for(int i=0; i { if(ACE_Thread::spawn((ACE_THR_FUNC)worker, (void*)&arg,THR_DETACHED|THR_NEW_LWP)==-1) ACE_DEBUG((LM_DEBUG,"Error in spawning thread\n")); } //Wait for all the other threads to let the main thread // know that they are done using the barrier barrier.wait(); ACE_DEBUG((LM_DEBUG,"(%t)Other threads are finished. Program exiting..\n")); ACE_OS::sleep(2); }
在此例中,主线程创建一个栅栏,并将其传递给工作者线程。每个工作者线程都在就要退出前在栅栏上调用
wait(),从而使它们在完成工作后和就要退出前阻塞住。主线程也在就要退出前阻塞。一旦所有线程(包括主线程)执行结束,它们就会一起退出。原子操作(Atomic Op)
ACE_Atomic_Op
类用于将同步透明地参数化进基本的算术运算中。ACE_Atomic_Op是一种模板类,锁定机制和需要参数化的类型被作为参数传入其中。ACE是这样来实现此机制的:重载所有算术操作符,并确保在操作前获取锁,在操作后释放它。运算本身被委托给通过模板传入的的类。下面的例子演示此类的用法:
例
4-8#include "ace/Synch.h"
//Global mutable and shared data on which we will perform simple
//arithmetic operations which will be protected.
ACE_Atomic_Op
//The worker threads will start from here.
static void* worker(void *arg)
{
ACE_UNUSED_ARG(arg);
foo=5;
ACE_ASSERT (foo == 5);
++foo;
ACE_ASSERT (foo == 6);
--foo;
ACE_ASSERT (foo == 5);
foo += 10;
ACE_ASSERT (foo == 15);
foo -= 10;
ACE_ASSERT (foo == 5);
foo = 5L;
ACE_ASSERT (foo == 5);
return 0;
}
int main(int argc, char *argv[])
{
//spawn threads as in previous examples
}
在上面的程序中,在
foo变量上执行了若干简单的算术运算。在运算之后,执行了断言检查来保证变量的值是它“应该是”的值。你也许想知道为什么这样的算术原语(比如像上例中那样)需要同步。你一定认为增减运算本来就是原子的。
但是,这些运算通常不是原子的。
CPU有可能将指令划分为三个步骤:读变量、增加或减少变量的值,以及回写。在这样的情况下,如果没有使用原子操作,就可能发生下面的情况:
即使没有使用同步原语,上面的例程也可能并不会出错。原因是这种情况下的线程是计算绑定的,
OS不会先占(pre-empt)这样的线程。但是,这样编写的代码是不安全的,因为你不能依赖OS调度器的工作方式。在大多数环境中,任何情况下同步关系都是非确定性的(因为实时效应,像页面错误或定时器的使用;或是因为实际上有多个物理处理器)。使用ACE_THREAD_MANAGER进行线程管理
在前面所有的例子中,我们一直使用
ACE_Thread包装类来创建和销毁线程。但是,该包装类的功能比较有限。ACE_Thread_Manager提供了ACE_Thread中的功能的超集。特别地,它增加了管理功能,以使启动、取消、挂起和恢复一组相关线程变得更为容易。它用于创建和销毁成组的线程和任务(ACE_Task是一种比线程更高级的构造,可在ACE中用于进行多线程编程。我们将在后面再来讨论任务)。它还提供了这样的功能:发送信号给一组线程,或是在一组线程上等待,而不是像我们在前面的例子中所看到的那样,以一种不可移植的方式来调用join()。下面的例子演示怎样使用
ACE_Thread_Manager创建一组线程,然后等待它们的完成。
例
4-9#include "ace/Thread_Manager.h"
#include "ace/Get_Opt.h"
static void* taskone(void*)
{
ACE_DEBUG((LM_DEBUG,"Thread:(%t)started Task one! \n"));
ACE_OS::sleep(2);
ACE_DEBUG((LM_DEBUG,"Thread:(%t)finished Task one!\n"));
return 0;
}
static void* tasktwo(void*)
{
ACE_DEBUG((LM_DEBUG,"Thread:(%t)started Task two!\n"));
ACE_OS::sleep(1);
ACE_DEBUG((LM_DEBUG,"Thread:(%t)finished Task two!\n"));
return 0;
}
static void print_usage_and_die()
{
ACE_DEBUG((LM_DEBUG,"Usage program_name
-a
-b "));
ACE_OS::exit(1);
}
int main(int argc, char* argv[])
{
int num_task_1;
int num_task_2;
if(argc<3)
print_usage_and_die();
ACE_Get_Opt get_opt(argc,argv,"a:b:");
char c;
while( (c=get_opt())!=EOF)
{
switch(c)
{
case ’a’:
num_task_1=ACE_OS::atoi(get_opt.optarg);
break;
case ’b’:
num_task_2=ACE_OS::atoi(get_opt.optarg);
break;
default:
ACE_ERROR((LM_ERROR,"Unknown option\n"));
ACE_OS::exit(1);
}
}
//Spawn the first set of threads that work on task 1.
if(ACE_Thread_Manager::instance()->spawn_n(num_task_1,
(ACE_THR_FUNC)taskone,//Execute task one
0, //No arguments
THR_NEW_LWP, //New Light Weight Process
ACE_DEFAULT_THREAD_PRIORITY,
1)==-1) //Group ID is 1
ACE_ERROR((LM_ERROR,
"Failure to spawn first group of threads: %p \n"));
//Spawn second set of threads that work on task 2.
if(ACE_Thread_Manager::instance()->spawn_n(num_task_2,
(ACE_THR_FUNC)tasktwo,//Execute task one
0, //No arguments
THR_NEW_LWP, //New Light Weight Process
ACE_DEFAULT_THREAD_PRIORITY,
2)==-1)//Group ID is 2
ACE_ERROR((LM_ERROR,
"Failure to spawn second group of threads: %p \n"));
//Wait for all tasks in grp 1 to exit
ACE_Thread_Manager::instance()->wait_grp(1);
ACE_DEBUG((LM_DEBUG,"Tasks in group 1 have exited! Continuing \n"));
//Wait for all tasks in grp 2 to exit
ACE_Thread_Manager::instance()->wait_grp(2);
ACE_DEBUG((LM_DEBUG,"Tasks in group 2 have exited! Continuing \n"));
}
下一个例子演示
ACE_Thread_Manager中的挂起、恢复和协作式取消机制:
例
4-10// Test out the group management mechanisms provided by the
// ACE_Thread_Manager, including the group suspension and resumption,
//and cooperative thread cancellation mechanisms.
#include "ace/Thread_Manager.h"
static const int DEFAULT_THREADS = ACE_DEFAULT_THREADS;
static const int DEFAULT_ITERATIONS = 100000;
static void *
worker (int iterations)
{
for (int i = 0; i < iterations; i++)
{
if ((i % 1000) == 0)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) checking cancellation before iteration %d!\n", i));
//Before doing work check if you have been canceled. If so don’t
//do any more work.
if (ACE_Thread_Manager::instance ()->testcancel
(ACE_Thread::self ()) != 0)
{
ACE_DEBUG ((LM_DEBUG,
"(%t) has been canceled before iteration %d!\n",i));
break;
}
}
}
return 0;
}
int main (int argc, char *argv[])
{
int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_THREADS;
int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS;
ACE_Thread_Manager *thr_mgr = ACE_Thread_Manager::instance ();
//Create a group of threads n_threads that will execute the worker
//function the spawn_n method returns the group ID for the group of
//threads that are spawned. The argument n_iterations is passed back
//to the worker. Notice that all threads are created detached.
int grp_id = thr_mgr->spawn_n (n_threads, ACE_THR_FUNC (worker),
(void *) n_iterations,
THR_NEW_LWP | THR_DETACHED);
// Wait for 1 second and then suspend every thread in the group.
ACE_OS::sleep (1);
ACE_DEBUG ((LM_DEBUG, "(%t) suspending group\n"));
if (thr_mgr->suspend_grp (grp_id) == -1)
ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "Could not suspend_grp"));
// Wait for 1 more second and then resume every thread in the
// group.
ACE_OS::sleep (1);
ACE_DEBUG ((LM_DEBUG, "(%t) resuming group\n"));
if (thr_mgr->resume_grp (grp_id) == -1)
ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "resume_grp"));
// Wait for 1 more second and then cancel all the threads.
ACE_OS::sleep (ACE_Time_Value (1));
ACE_DEBUG ((LM_DEBUG, "(%t) canceling group\n"));
if (thr_mgr->cancel_grp (grp_id) == -1)
ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "cancel_grp"));
// Perform a barrier wait until all the threads have shut down.
thr_mgr->wait ();
return 0;
}
在此例中创建了
n_threads个线程来执行worker函数。每个线程在worker函数中执行n_iterations次循环。当这些线程在worker函数中执行循环时,主线程将挂起(suspend())、恢复(resume())、并最终取消它们。Worker函数中的每个线程将使用ACE_Thread_Manager的testcancel()来检查取消情况。线程专有存储(Thread Specific Storage)
如果单线程程序希望创建一个变量,其值能在多个函数调用间持续,它可以静态或全局地分配该数据。如果这是个多线程程序,这个全局或静态数据对于所有线程来说都是一样的。这可能是,也可能不是我们所期望的。例如,伪随机数生成器可能需要静态的或全局的整型种子变量,它不会因为多个线程同时改变它的值而受到影响。但是,在另外的情形中,对于各个线程来说,可能需要不同的全局或静态数据。例如,设想一个多线程的
GUI应用,在其中各个窗口都运行在独立的线程中,并各拥有一个输入端口,窗口从中接收事件输入。这样的输入端口必须在窗口的各个函数调用之间保持“持续”,并且还必须是窗口专有的或私有的。可使用线程专有存储来满足此需求。像输入端口这样的结构可放在线程专有存储中,并可像逻辑上的静态或全局变量一样被访问;而实际上它对线程来说是私有的。传统上,线程专有存储通过让人迷惑的底层操作系统
API来实现。在ACE中,TSS通过使用ACE_TSS模板类来实现。需要成为线程专有的类被传入ACE_TSS模板,然后可以使用C++的->操作符来调用它的全部公共方法。下面的例子演示在
ACE中使用线程专有存储是多么简单:
例
4-11#include "ace/Synch.h"
#include "ace/Thread_Manager.h"
class DataType
{
public:
DataType():data(0){}
void increment(){ data++;}
void set(int new_data){ data=new_data;}
void decrement(){ data--;}
int get(){return data;}
private:
int data;
};
ACE_TSS
static void* thread1(void*)
{
data->set(10);
ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));
for(int i=0;i<5;i++)
data->increment();
ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));
return 0;
}
static void * thread2(void*)
{
data->set(100);
ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));
for(int i=0; i<5;i++)
data->increment();
ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));
return 0;
}
int main(int argc, char*argv[])
{
//Spawn off the first thread
ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)thread1,0,THR_NEW_LWP|
THR_DETACHED);
//Spawn off the second thread
ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)thread2,0,THR_NEW_LWP| THR_DETACHED);
//Wait for all threads in the manager to complete.
ACE_Thread_Manager::instance()->wait();
ACE_DEBUG((LM_DEBUG,"Both threads done.Exiting.. \n"));
}
在上面的例子中,
DataType类是在线程专有存储中创建的。随后程序使用->操作符来从函数thread1和thread2访问此类的方法,这两个函数分别在不同的线程中执行。第一个线程将私有数据变量设置为10,然后增加5,将它变为15。第二个线程使用它自己的私有数据变量,将它的值设为100,并增加5,变成105。尽管数据看起来是全局的,它实际上是线程专有的,而且两个线程分别打印出15和105,也说明了这一点。尽可能使用线程专有存储有若干好处。如果全局或静态数据可放在线程专有存储中,就可将同步所导致的开销降到最低。这是使用
TSS的主要好处。