#ifndef _SERVER_HANDER_H_
#define _SERVER_HANDER_H_
#pragma once
#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/message_block.h"
class ServerHander :public ACE_Service_Handler
{
public:
ServerHander(void);
virtual ~ServerHander(void);
static void SetSleepTime(const DWORD t)
{
m_sleepTime = t;
};
virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);
protected:
virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);
virtual void handle_time_out(const ACE_Time_Value &tv, const void *p);
void DisplayInfo(ACE_HANDLE h, char* str) const;
private:
ACE_Asynch_Read_Stream m_reader;
ACE_Asynch_Write_Stream m_writer;
static DWORD m_sleepTime;
time_t m_lastTime;
};
#endif
-----------------------------------------------------------------------------------------------------------------------------------
ServerHander.cpp
#include "StdAfx.h"
#include "ServerHander.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/OS.h"
#define TIME_OUT 10
DWORD ServerHander::m_sleepTime = 0;
ServerHander::ServerHander(void):m_lastTime(0)
{
}
ServerHander::~ServerHander(void)
{
//关闭
if (this->handle() != ace_invalid_handle)
{
//显示客户端连接地址和端口
displayinfo(this->handle(), " disconnected.");
ace_proactor::instance()->cancel_timer(*this,1);
ace_os::shutdown(this->handle(), sd_both);
ace_os::closesocket( this->handle() );
this->handle(ace_invalid_handle);
}
}
//客户端连接
void ServerHander::open(ACE_HANDLE h,
ACE_Message_Block& _mb)
{
this->handle(h);
//记录时间
m_lastTime = ACE_OS::time(NULL);
//ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0), ACE_Time_Value(TIME_OUT));
//构造I/O流
if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
{
cout<<"m_reader or m_writer open failed..."<<endl;
delete this;
return;
}
//显示客户端连接地址和端口
DisplayInfo(this->handle(), " connected.");
ACE_Message_Block* mb = NULL;
ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));
//发起读操作
if( this->m_reader.read( *mb, mb->space() ) != 0 )
{
cout<<"m_reader read failed..."<<endl;
mb->release();
delete this;
}
}
//读操作完成
void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
//记录时间
m_lastTime = ACE_OS::time(NULL);
ACE_Message_Block &mb = result.message_block();
//传输不成功
if ( (!result.success()) || (result.bytes_transferred() == 0) )
{
cout<<"Read failed..."<<endl;
mb.release();
delete this;
}
else //接收完成
{
//等待 模拟过载导致的响应速度变慢
Sleep( m_sleepTime );
//写回
//mb.wr_ptr(0);
//mb.wr_ptr()[-2] = 0x03;
if (this->m_writer.write( mb, mb.length() ) == -1)
{
cout<<"Server write failed..."<<endl;
mb.release();
}
else //写回成功,再继续读下一组数据
{
ACE_Message_Block *new_mb = NULL;
ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));
this->m_reader.read(*new_mb, new_mb->space());
cout<<"Read again."<<endl;
}
}
}
//写操作完成
void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
cout<<"Write completed."<<endl;
//释放消息
result.message_block().release();
}
//超时
void ServerHander::handle_time_out(const ACE_Time_Value &tv,
const void *p)
{
time_t curTime = ACE_OS::time(NULL);
if( curTime - m_lastTime > TIME_OUT )
{
cout<<"TimeOut"<<endl;
delete this;
}
}
//显示信息
void ServerHander::DisplayInfo(ACE_HANDLE h,
char* str) const
{
//获取客户端连接地址和端口
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
size_t addr_size=1;
ass.get_remote_addr(addr);
cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
}