接受器和连接器工厂(也就是
ACE_Connector和ACE_Acceptor)有着非常类似的运行结构。它们的工作可大致划分为三个阶段:
7.4.1 端点或连接初始化阶段
在使用接受器的情况下,应用级程序员可以调用
ACE_Acceptor工厂的open()方法,或是它的缺省构造器(它实际上会调用open()方法),来开始被动侦听连接。当接受器工厂的open()方法被调用时,如果反应堆单体还没有被实例化,open()方法就首先对其进行实例化。随后它调用底层具体接受器的open()方法。于是具体接受器会完成必要的初始化来侦听连接。例如,在使用ACE_SOCK_Acceptor的情况中,它打开socket,将其绑定到用户想要在其上侦听新连接的端口和地址上。在绑定端口后,它将会发出侦听调用。open方法随后将接受器工厂登记到反应堆。因而在接收到任何到来的连接请求时,反应堆会自动回调接受器工厂的handle_input()方法。注意正是因为这一原因,接受器工厂才从ACE_Event_Handler类层次派生;这样它才可以响应ACCEPT事件,并被反应堆自动回调。在使用连接器的情况中,应用程序员调用连接器工厂的
connect()方法或connect_n()方法来发起到对端的连接。除了其他一些选项,这两个方法的参数包括我们想要连接到的远地地址,以及我们是想要同步还是异步地完成连接。我们可以同步或异步地发起NUMBER_CONN个连接:
//Synchronous
OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,
ACE_Synch_Options::synch);
//Asynchronous
OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,
ACE_Synch_Options::asynch);
如果连接请求是异步的,
ACE_Connector会在反应堆上登记自己,等待连接被建立(ACE_Connector也派生自ACE_Event_Handler类层次)。一旦连接被建立,反应堆将随即自动回调连接器。但如果连接请求是同步的,connect()调用将会阻塞,直到连接被建立、或是超时到期为止。超时值可通过改变特定的ACE_Synch_Options来指定。详情请参见参考手册。7.4.2 接受器的服务初始化阶段
在有连接请求在指定的地址和端口上到来时,反应堆自动回调
ACE_Acceptor工厂的handle_input()方法。该方法是一个“模板方法”(
Template Method)。模板方法用于定义一个算法的若干步骤的顺序,并允许改变特定步骤的执行。这种变动是通过允许子类定义这些方法的实现来完成的。(有关模板方法的更多信息见“设计模式”参考指南)。在我们的这个案例中,模板方法将算法定义如下:
- make_svc_handler()
这些方法都可以被重新编写,从而灵活地决定这些操作怎样来实际执行。
这样,
handle_input()将首先调用make_svc_handler()方法,创建适当类型的服务处理器(如我们在上面的例子中所看到的那样,服务处理器的类型由应用程序员在ACE_Acceptor模板被实例化时传入)。在缺省情况下,make_svc_handler()方法只是实例化恰当的服务处理器。但是,make_svc_handler()是一个“桥接”(bridge)方法,可被重载以提供更多复杂功能。(桥接是一种设计模式,它使类层次的接口与实现去耦合。参阅“设计模式”参考文献)。例如,服务处理器可创建为进程级或线程级的单体,或者从库中动态链接,从磁盘中加载,甚或通过更复杂的方式创建,如从数据库中查找并获取服务处理器,并将它装入内存。在服务处理器被创建后,
handle_input()方法调用accept_svc_handler()。该方法将连接“接受进”服务处理器;缺省方式是调用底层具体接受器的accept()方法。在ACE_SOCK_Acceptor被用作具体接受器的情况下,它调用BSD accept()例程来建立连接(“接受”连接)。在连接建立后,连接句柄在服务处理器中被自动设置(接受“进”服务处理器);这个服务处理器是先前通过调用make_svc_handler()创建的。该方法也可被重载,以提供更复杂的功能。例如,不是实际创建新连接,而是“回收利用”旧连接。在我们演示各种不同的接受和连接策略时,将更为详尽地讨论这一点。7.4.3 连接器的服务初始化阶段
应用发出的
connect()方法与接受器工厂中的handle_input()相类似,也就是,它是一个“模板方法”。在我们的这个案例中,模板方法
connect()定义下面一些可被重定义的步骤:
- make_svc_handler()
每一方法都可以被重新编写,从而灵活地决定这些操作怎样来实际执行。
这样,在应用发出
connect()调用后,连接器工厂通过调用make_svc_handler()来实例化恰当的服务处理器,一如在接受器的案例中所做的那样。其缺省行为只是实例化适当的类,并且也可以通过与接受器完全相同的方式重载。进行这样的重载的原因可以与上面提到的原因非常类似。在服务处理器被创建后,
connect()调用确定连接是要成为异步的还是同步的。如果是异步的,在继续下一步骤之前,它将自己登记到反应堆,随后调用connect_svc_handler()方法。该方法的缺省行为是调用底层具体连接器的connect()方法。在使用ACE_SOCK_Connector的情况下,这意味着将适当的选项设置为阻塞或非阻塞式I/O,然后发出BSD connect()调用。如果连接被指定为同步的,connect()调用将会阻塞、直到连接完全建立。在这种情况下,在连接建立后,它将在服务处理器中设置句柄,以与它现在连接到的对端通信(该句柄即是通过在服务处理器中调用peer()方法获得的在流中存储的句柄,见上面的例子)。在服务处理器中设置句柄后,连接器模式将进行到最后阶段:服务处理。如果连接被指定为异步的,在向底层的具体连接器发出非阻塞式
connect()调用后,对connect_svc_handler()的调用将立即返回。在使用ACE_SOCK_Connector的情况中,这意味着发出非阻塞式BSD connect()调用。在连接稍后被实际建立时,反应堆将回调ACE_Connector工厂的handle_output()方法,该方法在通过make_svc_handler()方法创建的服务处理器中设置新句柄。然后工厂将进行到下一阶段:服务处理。与
accept_svc_handler()情况一样,connect_svc_handler()是一个“桥接”方法,可进行重载以提供变化的功能。7.4.4 服务处理
一旦服务处理器被创建、连接被建立,以及句柄在服务处理器中被设置,
ACE_Acceptor的handle_input()方法(或者在使用ACE_Connector的情况下,是handle_output()或connect_svc_handler())将调用activate_svc_handler()方法。该方法将随即启用服务处理器。其缺省行为是调用作为服务处理器的入口的open()方法。如我们在上面的例子中所看到的,在服务处理器开始执行时,open()方法是第一个被调用的方法。是在open()方法中,我们调用activate()方法来创建多个线程控制;并在反应堆上登记服务处理器,这样当新的数据在连接上到达时,它会被自动回调。该方法也是一个“桥接”方法,可被重载以提供更为复杂的功能。特别地,这个重载的方法可以提供更为复杂的并发策略,比如,在另一不同的进程中运行服务处理器。调谐接受器和连接器策略
如上面所提到的,因为使用了可以重载的桥接方法,很容易对接受器和连接器进行调谐。桥接方法允许调谐:
如上所示,调谐是通过重载
ACE_Acceptor或ACE_Connector类的桥接方法来完成的。ACE的设计使得程序员很容易完成这样的重载和调谐。7.5.1 ACE_Strategy_Connector和ACE_Strategy_Acceptor类
为了方便上面所提到的对接受器和连接器模式的调谐方法,
ACE提供了两种特殊的“可调谐”接受器和连接器工厂,那就是ACE_Strategy_Acceptor和ACE_Strategy_Connector。它们和ACE_Acceptor与ACE_Connector非常类似,同时还使用了“策略”模式。策略模式被用于使算法行为与类的接口去耦合。其基本概念是允许一个类(称为
Context Class,上下文类)的底层算法独立于使用该类的客户进行变动。这是通过具体策略类的帮助来完成的。具体策略类封装执行操作的算法或方法。这些具体策略类随后被上下文类用于执行各种操作(上下文类将“工作”委托给具体策略类)。因为上下文类不直接执行任何操作,当需要改变功能时,无需对它进行修改。对上下文类所做的唯一修改是使用另一个具体策略类来执行改变了的操作。(要阅读有关策略模式的更多信息,参见“设计模式”的附录)。在
ACE中,ACE_Strategy_Connector和ACE_Strategy_Acceptor使用若干具体策略类来改变算法,以创建服务处理器,建立连接,以及为服务处理器设置并发方法。如你可能已经猜到的一样,ACE_Strategy_Connector和ACE_Strategy_Acceptor利用了上面提到的桥接方法所提供的可调谐性。使用策略接受器和连接器
在
ACE中已有若干具体的策略类可用于“调谐”策略接受器和连接器。当类被实例化时,它们作为参数被传入策略接受器或连接器。表7-2显示了可用于调谐策略接受器和连接器类的一些类。
|
需要修改 |
具体策略类 |
描述 |
|
创建策略 ( 重定义make_svc_handler()) |
ACE_NOOP_Creation_Strategy |
这个具体策略并不实例化服务处理器,而只是一个空操作。 |
|
ACE_Singleton_Strategy |
保证服务处理器被创建为单体。也就是,所有连接将有效地使用同一个服务处理例程。 | |
|
ACE_DLL_Strategy |
通过从动态链接库中动态链接服务处理器来对它进行创建。 | |
|
连接策略 ( 重定义connect_svc_handler()) |
ACE_Cached_Connect_Strategy |
检查是否有已经连接到特定的远地地址的服务处理器没有在被使用。如果有这样一个服务处理器,就对它进行复用。 |
|
并发策略 ( 重定义activate_svc_handler()) |
ACE_NOOP_Concurrency_Strategy |
一个“无为”( do-nothing)的并发策略。它甚至不调用服务处理器的open()方法。 |
|
ACE_Process_Strategy |
在另外的进程中创建服务处理器,并调用它的 open()方法。 | |
|
ACE_Reactive_Strategy |
先在反应堆上登记服务处理器,然后调用它的 open()方法。 | |
|
ACE_Thread_Strategy |
先调用服务处理器的 open()方法,然后调用它的activate()方法,以让另外的线程来启动服务处理器的svc()方法。 |
表
7-2 用于调谐策略接受器和连接器类的类
下面的例子演示策略接受器和连接器类的使用。
例
7-8#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Acceptor.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Acceptor.h”
#define PORT_NUM 10101
#define DATA_SIZE 12
//forward declaration
class My_Svc_Handler;
//instantiate a strategy acceptor
typedef ACE_Strategy_Acceptor
//instantiate a concurrency strategy
typedef ACE_Process_Strategy
// Define the Service Handler
class My_Svc_Handler:
public ACE_Svc_Handler
{
private:
char* data;
public:
My_Svc_Handler()
{
data= new char[DATA_SIZE];
}
My_Svc_Handler(ACE_Thread_Manager* tm)
{
data= new char[DATA_SIZE];
}
int open(void*)
{
cout<<”Connection established”<
//Register with the reactor
ACE_Event_Handler::READ_MASK);
return 0;
}
int handle_input(ACE_HANDLE)
{
peer().recv_n(data,DATA_SIZE);
ACE_OS::printf(”<< %s\n”,data);
// keep yourself registered with the reactor
return 0;
}
};
int main(int argc, char* argv[])
{
ACE_INET_Addr addr(PORT_NUM);
//Concurrency Strategy
Concurrency_Strategy my_con_strat;
//Instantiate the acceptor
MyAcceptor acceptor(addr, //address to accept on
ACE_Reactor::instance(), //the reactor to use
0,
// don’t care about creation strategy0, // don’t care about connection estb. strategy
&my_con_strat); // use our new process concurrency strategy
while(1) /* Start the reactor’s event loop */
ACE_Reactor::instance()->handle_events();
}
这个例子基于上面的例
7-2。唯一的不同是它使用了ACE_Strategy_Acceptor,而不是使用ACE_Acceptor;并且它还使用ACE_Process_Strategy作为服务处理器的并发策略。这种并发策略保证一旦连接建立后,服务处理器在单独的进程中被实例化。如果在特定服务上的负载变得过于繁重,使用ACE_Process_Strategy可能是一个好主意。但是,在大多数情况下,使用ACE_Process_Strategy会过于昂贵,而ACE_Thread_Strategy可能是更好的选择。使用ACE_Cached_Connect_Strategy进行连接缓存
在许多应用中,客户会连接到服务器,然后重新连接到同一服务器若干次;每次都要建立连接,执行某些工作,然后挂断连接(比如像在
Web客户中所做的那样)。不用说,这样做是非常低效而昂贵的,因为连接建立和挂断是非常昂贵的操作。在这样的情况下,连接者可以采用一种更好的策略:“记住”老连接并保持它,直到确定客户不会再重新建立连接为止。ACE_Cached_Connect_Strategy就提供了这样一种缓存策略。这个策略对象被ACE_Strategy_Connector用于提供基于缓存的连接建立。如果一个连接已经存在,ACE_Strategy_Connector将会复用它,而不是创建新的连接。当客户试图重新建立连接到先前已经连接的服务器时,
ACE_Cached_Connect_Strategy确保对老的连接和服务处理器进行复用,而不是创建新的连接和服务处理器。因而,实际上,ACE_Cached_Connect_Strategy不仅管理连接建立策略,它还管理服务处理器创建策略。因为在此例中,用户不想创建新的服务处理器,我们将ACE_Null_Creation_Strategy传递给ACE_Strategy_Connector。如果连接先前没有建立过,ACE_Cached_Connect_Strategy将自动使用内部的创建策略来实例化适当的服务处理器,它是在这个模板类被实例化时传入的。这个策略可被设置为用户想要使用的任何一种策略。除此而外,也可以将ACE_Cached_Connect_Strategy自己在其构造器中使用的创建、并发和recycling策略传给它。下面的例子演示这些概念:
例
7-9#include ”ace/Reactor.h”
#include ”ace/Svc_Handler.h”
#include ”ace/Connector.h”
#include ”ace/Synch.h”
#include ”ace/SOCK_Connector.h”
#include ”ace/INET_Addr.h”
#define PORT_NUM 10101
#define DATA_SIZE 16
//forward declaration
class My_Svc_Handler;
//Function prototype
static void make_connections(void *arg);
// Template specializations for the hashing function for the
// hash_map which is used by the cache. The cache is used internally by the
// Cached Connection Strategy . Here we use ACE_Hash_Addr
// as our external identifier. This utility class has already
// overloaded the == operator and the hash() method. (The
// hashing function). The hash() method delegates the work to
// hash_i() and we use the IP address and port to get a
// a unique integer hash value.
size_t ACE_Hash_Addr
{
return addr.get_ip_address () + addr.get_port_number ();
}
//instantiate a strategy acceptor
typedef ACE_Strategy_Connector
STRATEGY_CONNECTOR;
//Instantiate the Creation Strategy
typedef ACE_NOOP_Creation_Strategy
NULL_CREATION_STRATEGY;
//Instantiate the Concurrency Strategy
typedef ACE_NOOP_Concurrency_Strategy
NULL_CONCURRENCY_STRATEGY;
//Instantiate the Connection Strategy
typedef ACE_Cached_Connect_Strategy ACE_SOCK_CONNECTOR, ACE_SYNCH_RW_MUTEX> CACHED_CONNECT_STRATEGY; class My_Svc_Handler: public ACE_Svc_Handler { private: char* data; public: My_Svc_Handler() { data= new char[DATA_SIZE]; } My_Svc_Handler(ACE_Thread_Manager* tm) { data= new char[DATA_SIZE]; } //Called before the service handler is recycled.. int
{
ACE_DEBUG ((LM_DEBUG,
”(%P|%t) recycling Svc_Handler %d with handle %d\n”,
this, this->peer ().get_handle ()));
return 0;
}
int open(void*)
{
ACE_DEBUG((LM_DEBUG,”(%t)Connection established \n”));
//Register the service handler with the reactor
ACE_Reactor::instance()
->register_handler(this,ACE_Event_Handler::READ_MASK);
activate(THR_NEW_LWP|THR_DETACHED);
return 0;
}
int handle_input(ACE_HANDLE)
{
ACE_DEBUG((LM_DEBUG,”Got input in thread: (%t) \n”));
peer().recv_n(data,DATA_SIZE);
ACE_DEBUG((LM_DEBUG,”<< %s\n”,data));
//keep yourself registered with the reactor
return 0;
}
int svc(void)
{
later.//send a few messages and then mark connection as idle so that it can
// be recycled
ACE_DEBUG((LM_DEBUG,”Started the service routine \n”));
for(int i=0;i<3;i++)
{
ACE_DEBUG((LM_DEBUG,”(%t)>>Hello World\n”));
ACE_OS::fflush(stdout);
peer().send_n(”Hello World”,sizeof(”Hello World”));
}
//Mark the service handler as being idle now and let the
//other threads reuse this connection
this->idle(1);
//Wait for the thread to die
this->thr_mgr()->wait();
return 0;
}
};
ACE_INET_Addr *addr;
int main(int argc, char* argv[])
{
addr= new ACE_INET_Addr(PORT_NUM,argv[1]);
//Creation Strategy
NULL_CREATION_STRATEGY creation_strategy;
//Concurrency Strategy
NULL_CONCURRENCY_STRATEGY concurrency_strategy;
//Connection Strategy
CACHED_CONNECT_STRATEGY caching_connect_strategy;
//instantiate the connector
STRATEGY_CONNECTOR connector(
ACE_Reactor::instance(), //the reactor to use
&creation_strategy,
&caching_connect_strategy,
&concurrency_strategy);
//Use the thread manager to spawn a single thread
//to connect multiple times passing it the address
//of the strategy connector
if(ACE_Thread_Manager::instance()->spawn(
(ACE_THR_FUNC) make_connections,
(void *) &connector,
THR_NEW_LWP) == -1)
ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n%a”, ”client thread spawn failed”));
while(1) /* Start the reactor’s event loop */
ACE_Reactor::instance()->handle_events();
}
//Connection establishment function, tries to establish connections
//to the same server again and re-uses the connections from the cache
void make_connections(void *arg)
{
ACE_DEBUG((LM_DEBUG,”(%t)Prepared to connect \n”));
STRATEGY_CONNECTOR *connector= (STRATEGY_CONNECTOR*) arg;
for (int i = 0; i < 10; i++)
{
}My_Svc_Handler *svc_handler = 0;
// Perform a blocking connect to the server using the Strategy
// Connector with a connection caching strategy. Since we are
// connecting to the same
these calls will return the // same dynamically allocated
for each call. if (connector->connect (svc_handler, *addr) == -1)
{
ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n”, ”connection failed\n”));
return;
}
// Rest for a few seconds so that the connection has been freed up
ACE_OS::sleep (5);
}
在上面的例子中,缓存式连接策略被用于缓存连接。要使用这一策略,需要一点额外的工作:定义
ACE_Cached_Connect_Strategy在内部使用的哈希映射管理器的hash()方法。这个hash()方法用于对服务处理器和ACE_Cached_Connect_Strategy内部使用的连接进行哈希运算,放入缓存映射中。它简单地使用IP地址和端口号的总和作为哈希函数,这也许并不是很好的哈希函数。这个例子比至今为止我们所列举的例子都要复杂一点,所以有理由多进行一点讨论。
我们为
ACE_Strategy_Acceptor使用空操作并发和创建策略。使用空操作创建策略是必须的。如上面所解释的,如果没有使用ACE_NOOP_Creation_Strategy,ACE_Cached_Connection_Strategy将会产生断言失败。但是,在使用ACE_Cached_Connect_Strategy时,任何并发策略都可以和策略接受器一起使用。如上面所提到的,ACE_Cached_Connect_Strategys所用的底层创建策略可以由用户来设置。还可以设置recycling策略。这是在实例化caching_connect_strategy时,通过将所需的创建和recycling策略的对象传给它的构造器来完成的。在这里我们没有这样做,而是使用了缺省的创建和recycling策略。在适当地设置连接器后,我们使用
Thread_Manager来派生新线程,且将make_connections()方法作为线程的入口。该方法使用我们的新的策略连接器来连接到远地站点。在连接建立后,该线程休眠5秒钟,然后使用我们的缓存式连接器来重新创建同样的连接。于是该线程应该在连接器缓存中找到这个连接并复用它。和平常一样,一旦连接建立后,反应堆回调我们的服务处理器(
My_Svc_Handler)。随后My_Svc_Handler的open()方法通过调用My_Svc_Handler的activate()方法来使它成为主动对象。svc()方法随后发送三条消息给远地主机,并通过调用服务处理器的idle()方法将该连接标记为空闲。注意this->thr_mrg_wait()要求线程管理器等待所有在线程管理器中的线程终止。如果你不要求线程管理器等待其它线程,根据在ACE中设定的语义,一旦ACE_Task(在此例中是ACE_Task类型的ACE_Svc_Handler)中的线程终止,ACE_Task对象(在此例中是ACE_My_Svc_Handler)就会被自动删除。如果发生了这种情况,在Cache_Connect_Strategy查找先前缓存的连接时,它就不会如我们期望的那样找到My_Svc_Handler,因为它已经被删除掉了。在
My_Svc_Handler中还重载了ACE_Svc_Handler中的recycle()方法。当有旧连接被ACE_Cache_Connect_Strategy找到时,这个方法就会被自动回调,这样服务处理器就可以在此方法中完成回收利用所特有的操作。在我们的例子中,我们只是打印出在缓存中找到的处理器的this指针的地址。在程序运行时,每次连接建立后所使用的句柄的地址是相同的,从而说明缓存工作正常。
