一般而言,接受器和连接器模式会在一起使用。在客户/服务器应用中,服务器通常含有接受器,而客户含有连接器。但是,在特定的应用中,可能需要同时使用接受器和连接器。下面给出这样的应用的一个例子:一条消息被反复发送给对端机器,而与此同时也从远端接受另一消息。因为两种功能必须同时执行,简单的解决方案就是分别在不同的线程里发送和接收消息。
这个例子同时包含接受器和连接器。用户可以在命令行上给出参数,告诉应用它想要扮演服务器还是客户角色。随后应用就相应地调用main_accept()或main_connect()。
例7-5
#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#include ”ace/Thread.h”
//Add our own Reactor singleton
typedef ACE_Singleton
//Create an Acceptor
typedef ACE_Acceptor
//Create a Connector
typedef ACE_Connector
class MyServiceHandler:
public ACE_Svc_Handler
{
public:
//Used by the two threads “globally” to determine their peer stream
static ACE_SOCK_Stream* Peer;
//Thread ID used to identify the threads
ACE_thread_t t_id;
int open(void*)
{
cout<<”Acceptor: received new connection”<
//Register with the reactor to remember this handle
Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
//Determine the peer stream and record it globally
MyServiceHandler::Peer=&peer();
//Spawn new thread to send string every second
ACE_Thread::spawn((ACE_THR_FUNC)send_data,0,THR_NEW_LWP,&t_id);
//keep the service handler registered by returning 0 to the
//reactor
return 0;
}
static void* send_data(void*)
{
while(1)
{
cout<<”>>Hello World”<
Peer->send_n(”Hello World”,sizeof(”Hello World”));
//Go to sleep for a second before sending again
ACE_OS::sleep(1);
}
return 0;
}
int handle_input(ACE_HANDLE)
{
char* data= new char[12];
//Check if peer aborted the connection
if(Peer.recv_n(data,12)==0)
{
cout<<”Peer probably aborted connection”);
ACE_Thread::cancel(t_id); //kill sending thread ..
return -1; //de-register from the Reactor.
}
//Show what you got..
cout<<”<< %s\n”,data”<
//keep yourself registered
return 0;
}
};
//Global stream identifier used by both threads
ACE_SOCK_Stream * MyServiceHandler::Peer=0;
void main_accept()
{
ACE_INET_Addr addr(PORT_NO);
Acceptor myacceptor(addr,Reactor::instance());
while(1)
Reactor::instance()->handle_events();
return 0;
}
void main_connect()
{
ACE_INET_Addr addr(PORT_NO,HOSTNAME);
Connector myconnector;
myconnector.connect(my_svc_handler,addr);
while(1)
Reactor::instance()->handle_events();
}
int main(int argc, char* argv[])
{
// Use ACE_Get_Opt to parse and obtain arguments and then call the
// appropriate function for accept or connect.
...
}
这个简单的例子演示怎样联合使用接受器和连接模式来生成服务处理例程,该例程与底层的网络连接建立方法是完全分离的。通过改变相应的设定具体连接器和接受器的模板参数,可以很容易地改用任何其他的底层网络连接建立协议。
下面的部分更为详细地解释接受器和连接器模式实际上是如何工作的。如果你想要调谐服务处理和连接建立策略(其中包括调谐底层具体连接器将要使用的服务处理例程的创建和并发策略,以及连接建立策略),对该模式的进一步了解就是必要的。此外,还有一部分内容解释怎样使用通过ACE_Svc_Handler类自动获得的高级特性。最后,我们说明怎样与接受器和连接器模式一起使用简单的轻量级ACE_Event_Handler。
如上面所提到的,ACE_Svc_Handler类基于ACE_Task(它是ASX流构架的一部分)和ACE_Event_Handler接口类。因而ACE_Svc_Handler既是任务,又是事件处理器。这里我们将简要介绍ACE_Task和ACE_Svc_Handler的功能。
ACE_Task被设计为与ASX流构架一起使用;ASX基于UNIX系统V中的流机制。在设计上ASX与Larry Peterson构建的X-kernel协议工具非常类似。
ASX的基本概念是:到来的消息会被分配给由若干模块(module)组成的流。每个模块在到来的消息上执行某种固定操作,然后把它传递给下一个模块作进一步处理,直到它到达流的末端为止。模块中的实际处理由任务来完成。每个模块通常有两个任务,一个用于处理到来的消息,一个用于处理外出的消息。在构造协议栈时,这种结构是非常有用的。因为每个模块都有固定的简单接口,所创建的模块可以很容易地在不同的应用间复用。例如,设想一个应用,它处理来自数据链路层的消息。程序员会构造若干模块,每个模块分别处理不同层次的协议。因而,他会构造一个单独的模块,进行网络层处理;另一个进行传输层处理;还有一个进行表示层处理。在构造这些模块之后,它们可以(在ASX的帮助下)被“串”成一个流来使用。如果后来创建了一个新的(也许是更好的)传输模块,就可以在不对程序产生任何影响的情况下、在流中替换先前的传输模块。注意模块就像是容纳任务的容器。这些任务是实际的处理元件。一个模块可能需要两个任务,如同在上面的例子中;也可能只需要一个任务。如你可能会猜到的,ACE_Task是模块中被称为任务的处理元件的实现。
每个ACE_Task都有一个内部的消息队列,用以与其他任务、模块或是外部世界通信。如果一个ACE_Task想要发送一条消息给另一个任务,它就将此消息放入目的任务的消息队列中。一旦目的任务收到此消息,它就会立即对它进行处理。
所有ACE_Task都可以作为0个或多个线程来运行。消息可以由多个线程放入ACE_Task的消息队列,或是从中取出,程序员无需担心破坏任何数据结构。因而任务可被用作由多个协作线程组成的系统的基础构建组件。各个线程控制都可封装在ACE_Task中,与其他任务通过发送消息到它们的消息队列来进行交互。
这种体系结构的唯一问题是,任务只能通过消息队列与在同一进程内的其他任务相互通信。ACE_Svc_Handler解决了这一问题,它同时继承自ACE_Task和ACE_Event_Handler,并且增加了一个私有数据流。这种结合使得ACE_Svc_Handler对象能够用作这样的任务:它能够处理事件、并与远地主机的任务间发送和接收数据。
ACE_Task被实现为模板容器,它通过锁定机制来进行实例化。该锁用于保证内部的消息队列在多线程环境中的完整性。如先前所提到的,ACE_Svc_Handler模板容器不仅需要锁定机制,还需要用于与远地任务通信的底层数据流来作为参数。
ACE_Svc_Handler模板通过锁定机制和底层流来实例化,以创建所需的服务处理器。如果应用只是单线程的,就不需要使用锁,可以用ACE_NULL_SYNCH来将其实例化。但是,如果我们想要在多线程应用中使用这个模板,可以这样来进行实例化:
class MySvcHandler:
public ACE_Svc_Handler
{
...
}
在上面的例7-5中,我们使用ACE_Thread包装类和它的静态方法spawn(),创建了单独的线程来发送数据给远地对端。但是,在我们完成此工作时,我们必须定义使用C++ static修饰符的文件范围内的静态send_data()方法。结果当然就是,我们无法访问我们实例化的实际对象的任何数据成员。换句话说,我们被迫使send_data()成员函数成为class-wide的函数,而这并不是我们所想要的。这样做的唯一原因是,ACE_Thread::spawn()只能使用静态成员函数来作为它所创建的线程的入口。另一个有害的副作用是到对端流的引用也必须成为静态的。简而言之,这不是编写这些代码的最好方式。
ACE_Task提供了更好的机制来避免发生这样的问题。每个ACE_Task都有activate()方法,可用于为ACE_Task创建线程。所创建的线程的入口是非静态成员函数svc()。因为svc()是非静态函数,它可以调用任何对象实例专有的数据或成员函数。ACE对程序员隐藏了该机制的所有实现细节。activate()方法有着非常多的用途,它允许程序员创建多个线程,所有这些线程都使用svc()方法作为它们的入口。还可以设置线程优先级、句柄、名字,等等。activate()方法的原型是:
// = Active object activation method.
virtual int activate (long flags = THR_NEW_LWP,
int n_threads = 1,
int force_active = 0,
long priority = ACE_DEFAULT_THREAD_PRIORITY,
int grp_id = -1,
ACE_Task_Base *task = 0,
ACE_hthread_t thread_handles[] = 0,
void *stack[] = 0,
size_t stack_size[] = 0,
ACE_thread_t thread_names[] = 0);
第一个参数flags描述将要创建的线程所希望具有的属性。在线程一章里有详细描述。可用的标志有:
THR_CANCEL_DISABLE, THR_CANCEL_ENABLE, THR_CANCEL_DEFERRED,
THR_CANCEL_ASYNCHRONOUS, THR_BOUND, THR_NEW_LWP, THR_DETACHED,
THR_SUSPENDED, THR_DAEMON, THR_JOINABLE, THR_SCHED_FIFO,
THR_SCHED_RR, THR_SCHED_DEFAULT
第二个参数n_threads指定要创建的线程的数目。第三个参数force_active用于指定是否应该创建新线程,即使activate()方法已在先前被调用过、因而任务或服务处理器已经在运行多个线程。如果此参数被设为false(0),且如果activate()是再次被调用,该方法就会设置失败代码,而不会生成更多的线程。
第四个参数用于设置运行线程的优先级。缺省情况下,或优先级被设为ACE_DEFAULT_THREAD_PRIORITY,方法会使用给定的调度策略(在flags中指定,例如,THR_SCHED_DEFAULT)的“适当”优先级。这个值是动态计算的,并且是在给定策略的最低和最高优先级之间。如果显式地给定一个值,这个值就会被使用。注意实际的优先级值极大地依赖于实现,最好不要直接使用。在线程一章中,可读到更多有关线程优先级的内容。
还可以传入将要创建的线程的线程句柄、线程名和栈空间,以在线程创建过程中使用。如果它们被设置为NULL,它们就不会被使用。但是如果要使用activate创建多个线程,就必须传入线程的名字或句柄,才能有效地对它们进行使用。
下面的例子可以帮助你进一步理解activate方法的使用:
例7-6
#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
class MyServiceHandler; //forward declaration
typedef ACE_Singleton
typedef ACE_Acceptor
class MyServiceHandler:
public ACE_Svc_Handler
{
// The two thread names are kept here
ACE_thread_t thread_names[2];
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));
//Register with the reactor to remember this handler..
Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));
//Create two new threads to create and send messages to the
//remote machine.
activate(THR_NEW_LWP,
2, //2 new threads
0, //force active false, if already created don’t try again.
ACE_DEFAULT_THREAD_PRIORITY,//Use default thread priority
-1,
this,//Which ACE_Task object to create? In this case this one.
0,// don’t care about thread handles used
0,// don’t care about where stacks are created
0,//don’t care about stack sizes
thread_names); // keep identifiers in thread_names
//keep the service handler registered with the acceptor.
return 0;
}
void send_message1(void)
{
//Send message type 1
ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));
//Send the data to the remote peer
ACE_DEBUG((LM_DEBUG,”Sent message1”));
peer().send_n(”Message1”,LENGTH_MSG_1);
} //end send_message1
int send_message2(void)
{
//Send message type 1
ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));
//Send the data to the remote peer
ACE_DEBUG((LM_DEBUG,”Sent Message2”));
peer().send_n(”Message2”,LENGTH_MSG_2);
}//end send_message_2
int svc(void)
{
ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));
if(ACE_Thread::self()== thread_names[0])
while(1) send_message1(); //send message 1s forever
else
while(1) send_message2(); //send message 2s forever
return 0; // keep the com, piler happy.
}
int handle_input(ACE_HANDLE)
{
ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));
char* data= new char[13];
//Check if peer aborted the connection
if(peer().recv_n(data,12)==0)
{
printf(”Peer probably aborted connection”);
return -1; //de-register from the Reactor.
}
//Show what you got..
ACE_OS::printf(”<< %s\n”,data);
//keep yourself registered
return 0;
}
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(10101);
ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));
//Prepare to accept connections
Acceptor myacceptor(addr,Reactor::instance());
// wait for something to happen.
while(1)
Reactor::instance()->handle_events();
return 0;
}
在此例中,服务处理器在它的open()方法中被登记到反应堆上,随后程序调用activate()来创建2个线程。线程的名字被记录下来,以便在它们调用svc()例程时,我们可以将它们区别开。每个线程发送一条不同类型的消息给远地对端。注意在此例中,线程的创建是完全透明的。此外,因为入口是普通的非静态成员函数,它不需要进行“丑陋的”改动来记住数据成员,比如说对端流。无论何时只要我们需要,我们就可以简单地调用成员函数peer()来获取底层的流。
使用服务处理器中的消息队列机制如前面所提到的,
ACE_Svc_Handler类拥有内建的消息队列。这个消息队列被用作在ACE_Svc_Handler和外部世界之间的主要通信接口。其他任务想要发给该服务处理器的消息被放入它的消息队列中。这些消息会在单独的线程里(通过调用activate()方法创建)处理。随后另一个线程就可以把处理过的消息通过网络发送给另外的远地目的地(很可能是另外的ACE_Svc_Handler)。如先前所提到的,在这种多线程情况下,
ACE_Svc_Handler会自动地使用锁来确保消息队列的完整性。所用的锁即通过实例化ACE_Svc_Handler模板类创建具体服务处理器时所传递的锁。之所用通过这样的方式来传递锁,是因为这样程序员就可以对他的应用进行“调谐”。不同平台上的不同锁定机制有着不同程度的开销。如果需要,程序员可以创建他自己的优化的、遵从ACE的锁接口定义的锁,并将其用于服务处理器。这是程序员通过使用ACE可获得的灵活性的又一范例。重要的是程序员必须意识到,在此服务处理例程中的额外线程将带来显著的锁定开销。为将此开销降至最低,程序员必须仔细地设计他的程序,确保使这样的开销最小化。特别地,上面描述的例子有可能导致过度的开销,在大多数情况下可能并不实用。ACE_Task
,进而是ACE_Svc_Handler(因为服务处理器也是一种任务),具有若干可用于对底层队列进行设置、操作、入队和出队操作的方法。这里我们将只讨论这些方法中的一部分。因为在服务处理器中(通过使用msg_queue()方法)可以获取指向消息队列自身的指针,所以也可以直接调用底层队列(也就是,ACE_Message_Queue)的所有公共方法。(有关消息队列提供的所有方法的更多细节,请参见后面的“消息队列”一章。)如上面所提到的,服务处理器的底层消息队列是
ACE_Message_Queue的实例,它是由服务处理器自动创建的。在大多数情况下,没有必要调用ACE_Message_Queue的底层方法,因为在ACE_Svc_Handler类中已对它们的大多数进行了包装。ACE_Message_Queue是用于使ACE_Message_Block进队或出队的队列。每个ACE_Message_Block都含有指向“引用计数”(reference-counted)的ACE_Data_Block的指针,ACE_Data_Block依次又指向存储在块中的实际数据(见“消息队列”一章)。这使得ACE_Message_Block可以很容易地进行数据共享。ACE_Message_Block
的主要作用是进行高效数据操作,而不带来许多拷贝开销。每个消息块都有一个读指针和写指针。无论何时我们从块中读取时,读指针会在数据块中向前增长。类似地,当我们向块中写的时候,写指针也会向前移动,这很像在流类型系统中的情况。可以通过ACE_Message_Block的构造器向它传递分配器,以用于分配内存(有关Allocator的更多信息,参见“内存管理”一章)。例如,可以使用ACE_Cached_Allocation_Strategy,它预先分配内存并从内存池中返回指针,而不是在需要的时候才从堆中分配内存。这样的功能在需要可预测的性能时十分有用,比如在实时系统中。下面的例子演示怎样使用消息队列的一些功能:
例
7-7#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#include ”ace/Thread.h”
#define NETWORK_SPEED 3
class MyServiceHandler; //forward declaration
typedef ACE_Singleton
typedef ACE_Acceptor
class MyServiceHandler:
public ACE_Svc_Handler
// The message sender and creator threads are handled here.
ACE_thread_t thread_names[2];
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));
//Register with the reactor to remember this handler..
Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));
//Create two new threads to create and send messages to the
//remote machine.
activate(THR_NEW_LWP,
2, //2 new threads
0,
ACE_DEFAULT_THREAD_PRIORITY,
-1,
this,
0,
0,
0,
thread_names); // identifiers in thread_handles
//keep the service handler registered with the acceptor.
return 0;
}
void send_message(void)
{
//Dequeue the message and send it off
ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));
//dequeue the message from the message queue
ACE_Message_Block *mb;
ACE_ASSERT(this->getq(mb)!=-1);
int length=mb->length();
char *data =mb->rd_ptr();
//Send the data to the remote peer
ACE_DEBUG((LM_DEBUG,”%s \n”,data,length));
peer().send_n(data,length);
//Simulate very SLOW network.
ACE_OS::sleep(NETWORK_SPEED);
//release the message block
mb->release();
} //end send_message
int construct_message(void)
{
// A very fast message creation algorithm
// would lead to the need for queuing messages..
// here. These messages are created and then sent
// using the SLOW send_message() routine which is
// running in a different thread so that the message
//construction thread isn’t blocked.
ACE_DEBUG((LM_DEBUG,”(%t)Constructing message::>> ”));
// Create a new message to send
ACE_Message_Block *mb;
char *data=”Hello Connector”;
ACE_NEW_RETURN (mb,ACE_Message_Block (16,//Message 16 bytes long
ACE_Message_Block::MB_DATA,//Set header to data
0,//No continuations.
data//The data we want to send
), 0);
mb->wr_ptr(16); //Set the write pointer.
// Enqueue the message into the message queue
// we COULD have done a timed wait for enqueuing in case
// someone else holds the lock to the queue so it doesn’t block
//forever..
ACE_ASSERT(this->putq(mb)!=-1);
ACE_DEBUG((LM_DEBUG,”Enqueued msg successfully\n”));
}
int svc(void)
{
ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));
//call the message creator thread
if(ACE_Thread::self()== thread_names[0])
while(1) construct_message(); //create messages forever
else
while(1) send_message(); //send messages forever
return 0; // keep the compiler happy.
}
int handle_input(ACE_HANDLE)
{
ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));
char* data= new char[13];
//Check if peer aborted the connection
if(peer().recv_n(data,12)==0)
{
printf(”Peer probably aborted connection”);
return -1; //de-register from the Reactor.
}
//Show what you got..
ACE_OS::printf(”<< %s\n”,data);
//keep yourself registered
return 0;
}
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(10101);
ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));
//Prepare to accept connections
Acceptor myacceptor(addr,Reactor::instance());
// wait for something to happen.
while(1)
Reactor::instance()->handle_events();
return 0;
}
这个例子演示怎样使用
putq()和getq()方法来在队列中放入或取出消息块。它还演示怎样创建消息块,随后设置它的写指针,并根据它的读指针进行读取。注意消息块中的实际数据的起始位置由消息块的读指针指示。消息块的length()成员函数返回在消息块中存储的底层数据的长度,其中不包括ACE_Message_Block中用于管理目的的部分。另外,我们也显示了怎样使用release()方法来释放消息块(mb)。要了解更多关于如何使用消息块、数据块或是消息队列的信息,请阅读此教程中有关“消息队列”、
ASX框架和其他相关的部分。
