贝贝花花包包店,精品555皮具,钱夹,皮夹

字体: | 推荐给好友 上一篇 | 下一篇

ACE程序员教程-第4章 线程管理:ACE的同步和线程管理机制

发布: 2008-6-13 14:28 | 作者: Umar Syyid | 来源: 转载 | 查看: 328次

 

4.2.4 杂项同步类

  除了上面描述的同步类,ACE还包括其他一些同步类,比如ACE_BarrierACE_Atomic_Op

 

4.2.4.1 ACE中的栅栏(Barrier

  栅栏有一个好名字,因为它恰切地描述了栅栏应做的事情。一组线程可以使用栅栏来进行共同的相互同步。组中的每个线程各自执行,直到到达栅栏,就阻塞在那里。在所有相关线程到达栅栏后,它们就全部继续它们的执行。就是说,它们一个接一个地阻塞,等待其他的线程到达栅栏;一旦所有线程都到达了它们的执行路径中的“栅栏点”,它们就一起重新启动。

  在ACE中,栅栏在ACE_Barrier类中实现。在栅栏对象被实例化时,它将要等待的线程的数目会作为参数传入。一旦到达执行路径中的“栅栏点”,每个线程都在栅栏对象上发出wait()调用。它们在这里阻塞,直到其他线程到达它们各自的“栅栏点”,然后再一起继续执行。当栅栏从相关线程那里接收了适当数目的wait()调用时,它就同时唤醒所有阻塞的线程。

  下面的例子演示怎样通过ACE使用栅栏:

 

4-7

#include "ace/Thread.h"

#include "ace/Synch.h"

 

static int number=0;

static int seed=0;

 

class Args

{

public:

Args(ACE_Barrier *barrier): barrier_(barrier){}

ACE_Barrier *barrier_;

};

 

static void*

worker(void *arguments)

{

Args *arg= (Args*)arguments;

ACE_DEBUG((LM_DEBUG,"Thread (%t) Created to do some work\n"));

::number++;

 

//Work

ACE_OS::sleep(ACE_OS::rand()%2);

 

//Exiting now

ACE_DEBUG((LM_DEBUG,

"\tThread (%t) Done! \n\tThe number is now: %d\n",number));

 

//Let the barrier know we are done.

arg->barrier_->wait();

ACE_DEBUG((LM_DEBUG,"Thread (%t) is exiting \n"));

 

return 0;

}

 

int main(int argc, char *argv[])

{

if(argc<2)

{

ACE_DEBUG((LM_DEBUG,"Usage: %s \n", argv[0]));

ACE_OS::exit(1);

}

 

int n_threads=ACE_OS::atoi(argv[1]);

ACE_DEBUG((LM_DEBUG,"Preparing to spawn %d threads",n_threads));

 

//Setup the random number generator

ACE_OS::srand(::seed);

 

//Setup arguments for threads

ACE_Barrier barrier(n_threads);

Args arg(&barrier);

 

//Spawn off n_threads number of threads

for(int i=0; i

{

if(ACE_Thread::spawn((ACE_THR_FUNC)worker,

(void*)&arg,THR_DETACHED|THR_NEW_LWP)==-1)

ACE_DEBUG((LM_DEBUG,"Error in spawning thread\n"));

}

 

//Wait for all the other threads to let the main thread

// know that they are done using the barrier

barrier.wait();

ACE_DEBUG((LM_DEBUG,"(%t)Other threads are finished. Program exiting..\n"));

ACE_OS::sleep(2);

}

 

在此例中,主线程创建一个栅栏,并将其传递给工作者线程。每个工作者线程都在就要退出前在栅栏上调用wait(),从而使它们在完成工作后和就要退出前阻塞住。主线程也在就要退出前阻塞。一旦所有线程(包括主线程)执行结束,它们就会一起退出。

 

4.2.4.2 原子操作(Atomic Op

  ACE_Atomic_Op类用于将同步透明地参数化进基本的算术运算中。ACE_Atomic_Op是一种模板类,锁定机制和需要参数化的类型被作为参数传入其中。ACE是这样来实现此机制的:重载所有算术操作符,并确保在操作前获取锁,在操作后释放它。运算本身被委托给通过模板传入的的类。

  下面的例子演示此类的用法:

 

4-8

#include "ace/Synch.h"

 

//Global mutable and shared data on which we will perform simple

//arithmetic operations which will be protected.

ACE_Atomic_Op foo;

 

//The worker threads will start from here.

static void* worker(void *arg)

{

ACE_UNUSED_ARG(arg);

 

foo=5;

ACE_ASSERT (foo == 5);

 

++foo;

ACE_ASSERT (foo == 6);

 

--foo;

ACE_ASSERT (foo == 5);

 

foo += 10;

ACE_ASSERT (foo == 15);

 

foo -= 10;

ACE_ASSERT (foo == 5);

 

foo = 5L;

ACE_ASSERT (foo == 5);

 

return 0;

}

 

int main(int argc, char *argv[])

{

//spawn threads as in previous examples

}

 

在上面的程序中,在foo变量上执行了若干简单的算术运算。在运算之后,执行了断言检查来保证变量的值是它“应该是”的值。

  你也许想知道为什么这样的算术原语(比如像上例中那样)需要同步。你一定认为增减运算本来就是原子的。

  但是,这些运算通常不是原子的。CPU有可能将指令划分为三个步骤:读变量、增加或减少变量的值,以及回写。在这样的情况下,如果没有使用原子操作,就可能发生下面的情况:

 

  • 线程一读变量,增加它的值,还未及将新值写回,就被OS调换出去。
  • 线程二读取变量的旧值,增加它并写回新的增加了的值。
  • 线程一用它自己的值覆盖了线程二的增量。

 

  即使没有使用同步原语,上面的例程也可能并不会出错。原因是这种情况下的线程是计算绑定的,OS不会先占(pre-empt)这样的线程。但是,这样编写的代码是不安全的,因为你不能依赖OS调度器的工作方式。在大多数环境中,任何情况下同步关系都是非确定性的(因为实时效应,像页面错误或定时器的使用;或是因为实际上有多个物理处理器)。

 

4.3 使用ACE_THREAD_MANAGER进行线程管理

  在前面所有的例子中,我们一直使用ACE_Thread包装类来创建和销毁线程。但是,该包装类的功能比较有限。ACE_Thread_Manager提供了ACE_Thread中的功能的超集。特别地,它增加了管理功能,以使启动、取消、挂起和恢复一组相关线程变得更为容易。它用于创建和销毁成组的线程和任务(ACE_Task是一种比线程更高级的构造,可在ACE中用于进行多线程编程。我们将在后面再来讨论任务)。它还提供了这样的功能:发送信号给一组线程,或是在一组线程上等待,而不是像我们在前面的例子中所看到的那样,以一种不可移植的方式来调用join()

  下面的例子演示怎样使用ACE_Thread_Manager创建一组线程,然后等待它们的完成。

 

4-9

#include "ace/Thread_Manager.h"

#include "ace/Get_Opt.h"

 

static void* taskone(void*)

{

ACE_DEBUG((LM_DEBUG,"Thread:(%t)started Task one! \n"));

ACE_OS::sleep(2);

ACE_DEBUG((LM_DEBUG,"Thread:(%t)finished Task one!\n"));

return 0;

}

 

static void* tasktwo(void*)

{

ACE_DEBUG((LM_DEBUG,"Thread:(%t)started Task two!\n"));

ACE_OS::sleep(1);

ACE_DEBUG((LM_DEBUG,"Thread:(%t)finished Task two!\n"));

return 0;

}

 

static void print_usage_and_die()

{

ACE_DEBUG((LM_DEBUG,"Usage program_name

-a -b"));

ACE_OS::exit(1);

}

 

int main(int argc, char* argv[])

{

int num_task_1;

int num_task_2;

 

if(argc<3)

print_usage_and_die();

 

ACE_Get_Opt get_opt(argc,argv,"a:b:");

 

char c;

while( (c=get_opt())!=EOF)

{

switch(c)

{

case ’a’:

num_task_1=ACE_OS::atoi(get_opt.optarg);

break;

case ’b’:

num_task_2=ACE_OS::atoi(get_opt.optarg);

break;

default:

ACE_ERROR((LM_ERROR,"Unknown option\n"));

ACE_OS::exit(1);

}

}

 

//Spawn the first set of threads that work on task 1.

if(ACE_Thread_Manager::instance()->spawn_n(num_task_1,

(ACE_THR_FUNC)taskone,//Execute task one

0, //No arguments

THR_NEW_LWP, //New Light Weight Process

ACE_DEFAULT_THREAD_PRIORITY,

1)==-1) //Group ID is 1

ACE_ERROR((LM_ERROR,

"Failure to spawn first group of threads: %p \n"));

 

//Spawn second set of threads that work on task 2.

if(ACE_Thread_Manager::instance()->spawn_n(num_task_2,

(ACE_THR_FUNC)tasktwo,//Execute task one

0, //No arguments

THR_NEW_LWP, //New Light Weight Process

ACE_DEFAULT_THREAD_PRIORITY,

2)==-1)//Group ID is 2

ACE_ERROR((LM_ERROR,

"Failure to spawn second group of threads: %p \n"));

 

//Wait for all tasks in grp 1 to exit

ACE_Thread_Manager::instance()->wait_grp(1);

ACE_DEBUG((LM_DEBUG,"Tasks in group 1 have exited! Continuing \n"));

 

//Wait for all tasks in grp 2 to exit

ACE_Thread_Manager::instance()->wait_grp(2);

 

ACE_DEBUG((LM_DEBUG,"Tasks in group 2 have exited! Continuing \n"));

}

 

  下一个例子演示ACE_Thread_Manager中的挂起、恢复和协作式取消机制:

 

4-10

// Test out the group management mechanisms provided by the

// ACE_Thread_Manager, including the group suspension and resumption,

//and cooperative thread cancellation mechanisms.

#include "ace/Thread_Manager.h"

static const int DEFAULT_THREADS = ACE_DEFAULT_THREADS;

static const int DEFAULT_ITERATIONS = 100000;

 

static void *

worker (int iterations)

{

for (int i = 0; i < iterations; i++)

{

if ((i % 1000) == 0)

{

ACE_DEBUG ((LM_DEBUG,

"(%t) checking cancellation before iteration %d!\n", i));

 

//Before doing work check if you have been canceled. If so don’t

//do any more work.

if (ACE_Thread_Manager::instance ()->testcancel

(ACE_Thread::self ()) != 0)

{

ACE_DEBUG ((LM_DEBUG,

"(%t) has been canceled before iteration %d!\n",i));

break;

}

}

}

 

return 0;

}

 

int main (int argc, char *argv[])

{

int n_threads = argc > 1 ? ACE_OS::atoi (argv[1]) : DEFAULT_THREADS;

int n_iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : DEFAULT_ITERATIONS;

 

ACE_Thread_Manager *thr_mgr = ACE_Thread_Manager::instance ();

 

//Create a group of threads n_threads that will execute the worker

//function the spawn_n method returns the group ID for the group of

//threads that are spawned. The argument n_iterations is passed back

//to the worker. Notice that all threads are created detached.

int grp_id = thr_mgr->spawn_n (n_threads, ACE_THR_FUNC (worker),

(void *) n_iterations,

THR_NEW_LWP | THR_DETACHED);

 

// Wait for 1 second and then suspend every thread in the group.

ACE_OS::sleep (1);

ACE_DEBUG ((LM_DEBUG, "(%t) suspending group\n"));

if (thr_mgr->suspend_grp (grp_id) == -1)

ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "Could not suspend_grp"));

 

// Wait for 1 more second and then resume every thread in the

// group.

ACE_OS::sleep (1);

ACE_DEBUG ((LM_DEBUG, "(%t) resuming group\n"));

if (thr_mgr->resume_grp (grp_id) == -1)

ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "resume_grp"));

 

// Wait for 1 more second and then cancel all the threads.

ACE_OS::sleep (ACE_Time_Value (1));

ACE_DEBUG ((LM_DEBUG, "(%t) canceling group\n"));

if (thr_mgr->cancel_grp (grp_id) == -1)

ACE_ERROR ((LM_DEBUG, "(%t) %p\n", "cancel_grp"));

 

// Perform a barrier wait until all the threads have shut down.

thr_mgr->wait ();

 

return 0;

}

 

在此例中创建了n_threads个线程来执行worker函数。每个线程在worker函数中执行n_iterations次循环。当这些线程在worker函数中执行循环时,主线程将挂起(suspend())、恢复(resume())、并最终取消它们。Worker函数中的每个线程将使用ACE_Thread_Managertestcancel()来检查取消情况。

 

4.4 线程专有存储(Thread Specific Storage

  如果单线程程序希望创建一个变量,其值能在多个函数调用间持续,它可以静态或全局地分配该数据。如果这是个多线程程序,这个全局或静态数据对于所有线程来说都是一样的。这可能是,也可能不是我们所期望的。例如,伪随机数生成器可能需要静态的或全局的整型种子变量,它不会因为多个线程同时改变它的值而受到影响。但是,在另外的情形中,对于各个线程来说,可能需要不同的全局或静态数据。例如,设想一个多线程的GUI应用,在其中各个窗口都运行在独立的线程中,并各拥有一个输入端口,窗口从中接收事件输入。这样的输入端口必须在窗口的各个函数调用之间保持“持续”,并且还必须是窗口专有的或私有的。可使用线程专有存储来满足此需求。像输入端口这样的结构可放在线程专有存储中,并可像逻辑上的静态或全局变量一样被访问;而实际上它对线程来说是私有的。

  传统上,线程专有存储通过让人迷惑的底层操作系统API来实现。在ACE中,TSS通过使用ACE_TSS模板类来实现。需要成为线程专有的类被传入ACE_TSS模板,然后可以使用C++->操作符来调用它的全部公共方法。

  下面的例子演示在ACE中使用线程专有存储是多么简单:

 

4-11

#include "ace/Synch.h"

#include "ace/Thread_Manager.h"

 

class DataType

{

public:

DataType():data(0){}

void increment(){ data++;}

void set(int new_data){ data=new_data;}

void decrement(){ data--;}

int get(){return data;}

 

private:

int data;

};

 

ACE_TSS data;

 

static void* thread1(void*)

{

data->set(10);

ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));

for(int i=0;i<5;i++)

data->increment();

ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));

return 0;

}

 

static void * thread2(void*)

{

data->set(100);

ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));

for(int i=0; i<5;i++)

data->increment();

ACE_DEBUG((LM_DEBUG,"(%t)The value of data is %d \n",data->get()));

return 0;

}

 

int main(int argc, char*argv[])

{

//Spawn off the first thread

ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)thread1,0,THR_NEW_LWP|

THR_DETACHED);

 

//Spawn off the second thread

ACE_Thread_Manager::instance()->spawn((ACE_THR_FUNC)thread2,0,THR_NEW_LWP| THR_DETACHED);

 

//Wait for all threads in the manager to complete.

ACE_Thread_Manager::instance()->wait();

ACE_DEBUG((LM_DEBUG,"Both threads done.Exiting.. \n"));

}

 

在上面的例子中,DataType类是在线程专有存储中创建的。随后程序使用->操作符来从函数thread1thread2访问此类的方法,这两个函数分别在不同的线程中执行。第一个线程将私有数据变量设置为10,然后增加5,将它变为15。第二个线程使用它自己的私有数据变量,将它的值设为100,并增加5,变成105。尽管数据看起来是全局的,它实际上是线程专有的,而且两个线程分别打印出15105,也说明了这一点。

  尽可能使用线程专有存储有若干好处。如果全局或静态数据可放在线程专有存储中,就可将同步所导致的开销降到最低。这是使用TSS的主要好处。

44/4<1234
 

评分:0

我来说两句

seccode