贝贝花花包包店,精品555皮具,钱夹,皮夹

字体: | 推荐给好友 上一篇 | 下一篇

ACE程序员教程-第9章 消息队列(Message Queue)

发布: 2008-6-13 13:51 | 作者: Umar Syyid | 来源: 转载 | 查看: 395次

9.5 动态或实时消息队列

  如上面所提到的,动态消息队列是其中的消息的优先级随时间变化的队列。实时应用需要这样的行为特性,因而这样的队列在实时应用中天生更为有用。

  ACE目前提供两种动态消息队列:基于最终期限(deadline)的和基于松弛度(laxity)的(参见[IX])动态消息队列。基于最终期限的消息队列通过每个消息的最终期限来设置它们的优先级。在使用最早deadline优先算法来调用dequeue_head()方法时,队列中有着最早的最终期限的消息块将最先出队。而基于松弛度的消息队列,同时使用执行时间和最终期限来计算松弛度,并将其用于划分各个消息块的优先级。松弛度是十分有用的,因为在根据最终期限来调度时,被调度的任务有可能有最早的最终期限,但同时又有相当长的执行时间,以致于即使它被立即调度,也不能够完成。这会消极地影响其它任务,因为它可能阻塞那些可以调度的任务。松弛度把这样的长执行时间考虑在内,并保证任务如果不能完成,就不会被调度。松弛度队列中的调度基于最小松弛度优先算法。

  基于松弛度的消息队列和基于最终期限的消息队列都实现为ACE_Dynamic_Message_QueueACE使用策略STRATEGY)模式来为动态队列提供不同的调度特性。每种消息队列使用不同的“策略”对象来动态地设置消息队列中消息的优先级。每个这样的“策略”对象都封装了一种不同的算法来基于执行时间、最终期限,等等,计算优先级;并且无论何时消息入队或是出队,都会调用这些策略对象来完成前述计算工作。(有关策略模式的更多信息,请参见“设计模式”)。消息策略模式派生自ACE_Dynamic_Message_Strategy,目前有两种策略可用:ACE_Laxity_Message_StrategyACE_Deadline_Message_Strategy。因此,要创建基于松弛度的动态消息队列,首先必须创建ACE_Laxity_Message_Strategy对象。随后,应该对ACE_Dynamic_Message_Queue对象进行实例化,并将新创建的策略对象作为参数之一传给它的构造器。

 

 

9-2 ACE_Dynamic_Message_QueueACE_Laxity_Message_Strategy的关系

 

创建消息队列

  为简化这些不同类型的消息队列的创建,ACE提供了名为ACE_Message_Queue_Factory的具体消息队列工厂,
它使用工厂方法(
FACTORY METHOD,更多信息参见“设计模式”)模式的一种变种来创建适当类型的消息队列。
消息队列工厂有三个静态的工厂方法,可用来创建三种不同类型的消息队列:

static ACE_Message_Queue *

create_static_message_queue();

 

static ACE_Dynamic_Message_Queue *

create_deadline_message_queue();

 

static ACE_Dynamic_Message_Queue *

create_laxity_message_queue();

  每个方法都返回指向刚创建的消息队列的指针。注意这些方法都是静态的,而create_static_message_queue()方法返回的是ACE_Message_Queue,其它两个方法则返回ACE_Dynamic_Message_Queue

  下面这个简单的例子演示动态和静态消息队列的创建和使用:

9-3

 

#include ”ace/Message_Queue.h”

#include ”ace/Get_Opt.h”

#include ”ace/OS.h”

 

class Args

{

public:

Args(int argc, char*argv[],int& no_msgs, int& time,

ACE_Message_Queue*&mq)

{

ACE_Get_Opt get_opts(argc,argv,”h:l:t:n:xsd”);

while((opt=get_opts())!=-1)

switch(opt)

{

case ’t’:

time=ACE_OS::atoi(get_opts.optarg);

ACE_DEBUG((LM_INFO,”Time: %d \n”,time));

break;

 

case ’n’:

no_msgs=ACE_OS::atoi(get_opts.optarg);

ACE_DEBUG((LM_INFO,”Number of Messages %d \n”,no_msgs));

break;

 

case ’x’:

mq=ACE_Message_Queue_Factory::

create_laxity_message_queue();

ACE_DEBUG((LM_DEBUG,”Creating laxity q\n”));

break;

 

case ’d’:

mq=ACE_Message_Queue_Factory::

create_deadline_message_queue();

ACE_DEBUG((LM_DEBUG,”Creating deadline q\n”));

break;

 

case ’s’:

mq=ACE_Message_Queue_Factory::

create_static_message_queue();

ACE_DEBUG((LM_DEBUG,”Creating static q\n”));

break;

 

case ’h’:

hwm=ACE_OS::atoi(get_opts.optarg);

mq->high_water_mark(hwm);

ACE_DEBUG((LM_INFO,”High Water Mark %d msgs \n”,hwm));

break;

 

case ’l’:

lwm=ACE_OS::atoi(get_opts.optarg);

mq->low_water_mark(lwm);

ACE_DEBUG((LM_INFO,”Low Water Mark %d msgs \n”,lwm));

break;

 

default:

ACE_DEBUG((LM_ERROR,”Usage specify queue type\n”));

break;

}

}

 

private:

int opt;

int hwm;

int lwm;

};

 

class Qtest

{

public:

QTest(int argc, char*argv[])

{

args_ = new Args(argc,argv,no_msgs_,time_,mq_);

array_ =new ACE_Message_Block*[no_msgs_];

}

 

int start_test()

{

for(int i=0; i

{

ACE_NEW_RETURN (array_[i], ACE_Message_Block (1), -1);

set_deadline(i);

set_execution_time(i);

enqueue(i);

}

 

this->dequeue_all();

 

return 0;

}

 

//Call the underlying ACE_Message_Block method msg_deadline_time() to

//set the deadline of the message.

void set_deadline(int msg_no)

{

float temp=(float) time_/(msg_no+1);

ACE_Time_Value tv;

tv.set(temp);

ACE_Time_Value deadline(ACE_OS::gettimeofday()+tv);

array_[msg_no]->msg_deadline_time(deadline);

ACE_DEBUG((LM_INFO,”EQ’d with DLine %d:%d,”, deadline.sec(),deadline.usec()));

}

 

//Call the underlying ACE_Message_Block method to set the execution time

void set_execution_time(int msg_no)

{

float temp=(float) time_/10*msg_no;

ACE_Time_Value tv;

tv.set(temp);

ACE_Time_Value xtime(ACE_OS::gettimeofday()+tv);

array_[msg_no]->msg_execution_time (xtime);

ACE_DEBUG((LM_INFO,”Xtime %d:%d,”,xtime.sec(),xtime.usec()));

}

 

void enqueue(int msg_no)

{

//Set the value of data at the read position

*array_[msg_no]->rd_ptr()=msg_no;

 

//Advance write pointer

array_[msg_no]->wr_ptr(1);

 

//Enqueue on the message queue

if(mq_->enqueue_prio(array_[msg_no])==-1)

{

ACE_DEBUG((LM_ERROR,”\nCould not enqueue on to mq!!\n”));

return;

}

 

ACE_DEBUG((LM_INFO,”Data %d\n”,*array_[msg_no]->rd_ptr()));

}

 

void dequeue_all()

{

ACE_DEBUG((LM_INFO,”Beginning DQ \n”));

ACE_DEBUG((LM_INFO,”No. of Messages on Q:%d Bytes on Q:%d \n”,

mq_->message_count(),mq_->message_bytes()));

 

for(int i=0;i

{

ACE_Message_Block *mb;

if(mq_->dequeue_head(mb)==-1)

{

ACE_DEBUG((LM_ERROR,”\nCould not dequeue from mq!!\n”));

return;

}

 

ACE_DEBUG((LM_INFO,”DQ’d data %d\n”,*mb->rd_ptr()));

}

}

 

private:

Args *args_;

ACE_Message_Block **array_;

ACE_Message_Queue *mq_;

int no_msgs_;

int time_;

};

 

int main(int argc, char* argv[])

{

QTest test(argc,argv);

if(test.start_test()<0)

ACE_DEBUG((LM_ERROR,”Program failure \n”));

}

 

 

 

上面这个例子和前面的例子很相似,只是在其中增加了动态消息队列。在Args类中,我们增加了选项来使用ACE_Message_Queue_Factory创建所有不同类型的消息队列。此外,在Qtest类中增加了两个新方法,用以在每个消息块创建时设置它们的最终期限和执行时间(set_deadline()set_execution_time())。这两个方法使用了ACE_Message_Blockmsg_execution_time()msg_deadline_time()方法。注意它们采用的是绝对时间而非相对时间,这也是它们和ACE_OS::gettimeofday()方法一起使用的原因。

  最终期限和执行时间通过time参数的帮助来设置。最终期限是这样来设置的:第一个消息将拥有最后的最终期限,在最终期限消息队列的情形中,它应该最后被调度。但是在使用松弛度队列时,执行时间和最终期限都将被考虑在内。

33/3<123
 

评分:0

我来说两句

seccode