例
9-1a#ifndef MQ_EG1_H_
#define MQ_EG1_H_
#include "ace/Message_Queue.h"
class QTest
{
public:
//Constructor creates a message queue with no synchronization
QTest(int num_msgs);
//Enqueue the num of messages required onto the message mq.
int enq_msgs();
//Dequeue all the messages previously enqueued.
int deq_msgs ();
private:
//Underlying message queue
ACE_Message_Queue
//Number of messages to enqueue.
int no_msgs_;
};
#endif /*MQ_EG1.H_*/
例
9-1b#include "mq_eg1.h"
QTest::QTest(int num_msgs)
:no_msgs_(num_msgs)
{
ACE_TRACE("QTest::QTest");
//First create a message queue of default size.
if(!(this->mq_=new ACE_Message_Queue
ACE_DEBUG((LM_ERROR,"Error in message queue initialization \n"));
}
int
QTest::enq_msgs(){
ACE_TRACE("QTest::enq_msg");
for(int i=0; i { //create a new message block specifying exactly how large //an underlying data block should be created. ACE_Message_Block *mb; ACE_NEW_RETURN(mb, //Insert data into the message block using the wr_ptr ACE_OS::sprintf(mb->wr_ptr(), "This is message %d\n", i); //Be careful to advance the wr_ptr by the necessary amount. //Note that the argument is of type "size_t" that is mapped to //bytes. mb->wr_ptr(ACE_OS::strlen("This is message 1\n")); //Enqueue the message block onto the message queue if(this->mq_->enqueue_prio(mb)==-1) { ACE_DEBUG((LM_ERROR,"\nCould not enqueue on to mq!!\n")); return -1; } ACE_DEBUG((LM_INFO,"EQ’d data: %s\n", mb->rd_ptr() )); } //end for //Now dequeue all the messages this->deq_msgs(); return 0; } int
ACE_Message_Block(ACE_OS::strlen("This is message 1\n")),-1);
{
ACE_TRACE("QTest::dequeue_all");
ACE_DEBUG((LM_INFO,"No. of Messages on Q:%d Bytes on Q:%d \n"
,mq_->message_count(),mq_->message_bytes()));
ACE_Message_Block *mb;
//dequeue the head of the message queue until no more messages are
//left. Note that I am overwriting the message block mb and I since
//I am using the dequeue_head() method I dont have to worry about
//resetting the rd_ptr() as I did for the wrt_ptr()
for(int i=0;i { mq_->dequeue_head(mb); ACE_DEBUG((LM_INFO,"DQ’d data %s\n", mb->rd_ptr() )); //Release the memory associated with the mb mb->release(); } return 0; } int main(int argc, char* argv[]) { if(argc <2) ACE_ERROR_RETURN((LM_ERROR, "Usage %s num_msgs", argv[0]), -1); QTest test(ACE_OS::atoi(argv[1])); if(test.enq_msgs() == -1) ACE_ERROR_RETURN( (LM_ERROR,"Program failure \n"), -1); }
上例演示了消息队列类的若干方法。例子由一个
Qtest类组成,它通过ACE_NULL_SYNCH锁定来实例化缺省大小的消息队列。锁(互斥体和条件变量)被消息队列用来:在此例中,因为只有一个线程,消息队列的模板同步参数被设置为空(
ACE_NULL_SYNCH,意味着使用ACE_Null_Mutex和ACE_Null_Condition)。随后Qtest的enq_msgs()方法被调用,它进入循环,创建消息、并将其放入消息队列中。消息数据的大小作为参数传给ACE_Message_Block的构造器。使用该构造器使得内存被自动地管理(也就是,内存将在消息块被删除时,即release()时被释放)。wr_ptr随后被获取(使用wr_ptr()访问方法),且数据被拷贝进消息块。在此之后,wr_ptr向前增长。然后使用消息队列的enqueue_prio()方法来实际地将消息块放入底层消息队列中。在
no_msgs_个消息块被创建、初始化和插入消息队列后,enq_msgs()调用deq_msgs()方法。该方法使用ACE_Message_Queue的dequeue_head()方法来使消息队列中的每个消息出队。在消息出队后,就显示它的数据,然后再释放消息。 水位标 水位标用于在消息队列中指示何时在其中的数据已过多(消息队列到达了高水位标),或何时在其中的数据的数量不足(消息队列到达了低水位标)。两种水位标都用于流量控制棗例如,low_water_mark可用于避免像TCP中的“傻窗口综合症”(silly window syndrome)那样的情况,而high_water_mark可用于“阻止“或减缓数据的发送或生产。 ACE中的消息队列通过维护已经入队的总数据量的字节计数来获得这些功能。因而,无论何时有新消息块被放入消息队列中,消息队列都将先确定它的长度,然后检查是否能将此消息块放入队列中(也就是,确认如果将此消息块入队,消息队列没有超过它的高水位标)。如果消息队列不能将数据入队,而它又持有一个锁(也就是,使用了ACE_SYNC,而不是ACE_NULL_SYNCH作为消息队列的模板参数),它就会阻塞调用者,直到有足够的空间可用,或是入队方法的超时(timeout)到期。如果超时已到期,或是队列持有一个空锁,入队方法就会返回-1,指示无法将消息入队。类似地,当
ACE_Message_Queue的dequeue_head方法被调用时,它检查并确认在出队之后,剩下的数据的数量高于低水位标。如果不是这样,而它又持有一个锁,它就会阻塞;否则就返回-1,指示失败(和入队方法的工作方式一样)。分别有两个方法可用于设置和获取高低水位标:
//get the high water mark
size_t high_water_mark(void)
//set the high water mark
void high_water_mark(size_t hwm);
//get the low water_mark
size_t low_water_mark(void)
//set the low water_mark
void low_water_mark(size_t lwm)
使用消息队列迭代器(Message Queue Iterator)和其它容器类的常见情况一样,可将前进(forward)和后退(reverse)迭代器用于ACE中的消息队列。这两个迭代器名为ACE_Message_Queue_Iterator和ACE_Message_Queue_Reverse_Iterator。它们都需要一个模板参数,用于在遍历消息队列时进行同步。如果有多个线程使用消息队列,该参数就应设为ACE_SYNCH;否则,就可设为ACE_NULL_SYNCH。在迭代器对象被创建时,必须将我们想要进行迭代的消息队列的引用传给它的构造器。
下面的例子演示水位标和迭代器的使用:
例
9-2 #include ”ace/Message_Queue.h”#include ”ace/Get_Opt.h”
#include ”ace/Malloc_T.h”
#define SIZE_BLOCK 1
class Args
{
public:
Args(int argc, char*argv[],int& no_msgs, ACE_Message_Queue
{
ACE_Get_Opt get_opts(argc,argv,”h:l:t:n:xsd”);
while((opt=get_opts())!=-1)
switch(opt)
{
case ’n’:
//set the number of messages we wish to enqueue and dequeue
no_msgs=ACE_OS::atoi(get_opts.optarg);
ACE_DEBUG((LM_INFO,”Number of Messages %d \n”,no_msgs));
break;
case ’h’:
//set the high water mark
hwm=ACE_OS::atoi(get_opts.optarg);
mq->high_water_mark(hwm);
ACE_DEBUG((LM_INFO,”High Water Mark %d msgs \n”,hwm));
break;
case ’l’:
//set the low water mark
lwm=ACE_OS::atoi(get_opts.optarg);
mq->low_water_mark(lwm);
ACE_DEBUG((LM_INFO,”Low Water Mark %d msgs \n”,lwm));
break;
default:
ACE_DEBUG((LM_ERROR,
”Usage -n
-h -l \n”)); break;
}
}
private:
int opt;
int hwm;
int lwm;
};
class Qtest
{
public:
QTest(int argc, char*argv[])
{
//First create a message queue of default size.
if(!(this->mq_=new ACE_Message_Queue
())) ACE_DEBUG((LM_ERROR,”Error in message queue initialization \n”));
//Use the arguments to set the water marks and the no of messages
args_ = new Args(argc,argv,no_msgs_,mq_);
}
int start_test()
{
for(int i=0; i
{
//Create a new message block of data buffer size 1
ACE_Message_Block * mb= new ACE_Message_Block(SIZE_BLOCK);
//Insert data into the message block using the rd_ptr
*mb->wr_ptr()=i;
//Be careful to advance the wr_ptr
mb->wr_ptr(1);
//Enqueue the message block onto the message queue
if(this->mq_->enqueue_prio(mb)==-1)
{
ACE_DEBUG((LM_ERROR,”\nCould not enqueue on to mq!!\n”));
return -1;
}
ACE_DEBUG((LM_INFO,”EQ’d data: %d\n”,*mb->rd_ptr()));
}
//Use the iterators to read
this->read_all();
//Dequeue all the messages
this->dequeue_all();
return 0;
}
void read_all()
{
ACE_DEBUG((LM_INFO,”No. of Messages on Q:%d Bytes on Q:%d \n”
,mq_->message_count(),mq_->message_bytes()));
ACE_Message_Block *mb;
//Use the forward iterator
ACE_DEBUG((LM_INFO,”\n\nBeginning Forward Read \n”));
ACE_Message_Queue_Iterator
mq_iter_(*mq_); while(mq_iter_.next(mb))
{
mq_iter_.advance();
ACE_DEBUG((LM_INFO,”Read data %d\n”,*mb->rd_ptr()));
}
//Use the reverse iterator
ACE_DEBUG((LM_INFO,”\n\nBeginning Reverse Read \n”));
ACE_Message_Queue_Reverse_Iterator
mq_rev_iter_(*mq_);
while(mq_rev_iter_.next(mb))
{
mq_rev_iter_.advance();
ACE_DEBUG((LM_INFO,”Read data %d\n”,*mb->rd_ptr()));
}
}
void dequeue_all()
{
ACE_DEBUG((LM_INFO,”\n\nBeginning DQ \n”));
ACE_DEBUG((LM_INFO,”No. of Messages on Q:%d Bytes on Q:%d \n”,
mq_->message_count(),mq_->message_bytes()));
ACE_Message_Block *mb;
//dequeue the head of the message queue until no more messages
//are left
for(int i=0;i
{
mq_->dequeue_head(mb);
ACE_DEBUG((LM_INFO,”DQ’d data %d\n”,*mb->rd_ptr()));
}
}
private:
Args *args_;
ACE_Message_Queue
int no_msgs_;
};
int main(int argc, char* argv[])
{
QTest test(argc,argv);
if(test.start_test()<0)
ACE_DEBUG((LM_ERROR,”Program failure \n”));
}
这个例子使用
ACE_Get_Opt类(更多关于这个工具类的信息见附录)来获取低水位标和高水位标(在Args类中)。使用low_water_mark()和high_water_mark()访问函数可对它们进行设置。除此而外,还有一个read_all()方法使用ACE_Message_Queue_Iterator和ACE_Message_Queue_Reverse_Iterator来向前读和反向读。
