标题: proactor windows下的线程模型问题
winston





UID 2
精华 2
积分 365
帖子 785
威望 365
金钱 355
ACEJOY 357
阅读权限
注册 2007-12-10
状态 在线
发表于 2007-12-19 11:19  资料  个人空间  短消息  加为好友  QQ
避免环肯定会引入复杂性的。这就要看是否值得,如果确保程序调用顺序,在任何情况下都不会死锁,我倒是觉得无所谓。
避免实体先于接口被释放,COM有实现,可以借鉴。记得应该就是引用计数规则吧。

顶部
[广告]
rosebush





UID 32
精华 0
积分 100
帖子 13
威望 100
金钱 100
ACEJOY 100
阅读权限
注册 2007-12-13
状态 离线
发表于 2007-12-19 11:29  资料  个人空间  短消息  加为好友 
我将echoserver的代码贴出供大家研究测试

首先是 ServerHandler.h

#ifndef _SERVER_HANDER_H_
#define _SERVER_HANDER_H_

#pragma once


#include "ace/Proactor.h"
#include "ace/Asynch_IO.h"
#include "ace/message_block.h"

class ServerHander :public ACE_Service_Handler
{
public:
        ServerHander(void);
        virtual ~ServerHander(void);
        static void SetSleepTime(const DWORD t)
        {
                m_sleepTime = t;
        };

        virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);

protected:
        virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);

        virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);

        virtual void  handle_time_out(const ACE_Time_Value &tv, const void *p);

        void DisplayInfo(ACE_HANDLE h, char* str) const;


private:
        ACE_Asynch_Read_Stream        m_reader;
        ACE_Asynch_Write_Stream m_writer;

        static DWORD                        m_sleepTime;
        time_t                                        m_lastTime;
};


#endif

-----------------------------------------------------------------------------------------------------------------------------------
ServerHander.cpp

#include "StdAfx.h"
#include "ServerHander.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/INET_Addr.h"
#include "ace/SOCK_SEQPACK_Association.h"
#include "ace/OS.h"

#define TIME_OUT 10

DWORD ServerHander::m_sleepTime = 0;

ServerHander::ServerHander(void):m_lastTime(0)
{
}

ServerHander::~ServerHander(void)
{
        //关闭
        if (this->handle() != ace_invalid_handle)               
        {
                //显示客户端连接地址和端口
                displayinfo(this->handle(), " disconnected.");

                ace_proactor::instance()->cancel_timer(*this,1);
                ace_os::shutdown(this->handle(), sd_both);
                ace_os::closesocket( this->handle() );
                this->handle(ace_invalid_handle);
        }
}

//客户端连接
void ServerHander::open(ACE_HANDLE h,
                                                ACE_Message_Block& _mb)
{
        this->handle(h);

        //记录时间
        m_lastTime = ACE_OS::time(NULL);

        //ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0), ACE_Time_Value(TIME_OUT));

        //构造I/O流
        if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
        {
                cout<<"m_reader or m_writer open failed..."<<endl;
                delete this;
                return;
        }

        //显示客户端连接地址和端口        
        DisplayInfo(this->handle(), " connected.");

        ACE_Message_Block* mb = NULL;
        ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));        

        //发起读操作
        if( this->m_reader.read( *mb, mb->space() ) != 0 )
        {
                cout<<"m_reader read failed..."<<endl;
                mb->release();
                delete this;
        }
}

//读操作完成
void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
        //记录时间
        m_lastTime = ACE_OS::time(NULL);

        ACE_Message_Block &mb = result.message_block();

        //传输不成功
        if ( (!result.success()) || (result.bytes_transferred() == 0) )
        {
                cout<<"Read failed..."<<endl;

                mb.release();
                delete this;  
        }
        else        //接收完成
        {
                //等待 模拟过载导致的响应速度变慢
                Sleep( m_sleepTime );

                //写回
                //mb.wr_ptr(0);
                //mb.wr_ptr()[-2] = 0x03;
                if (this->m_writer.write( mb, mb.length() ) == -1)
                {
                        cout<<"Server write failed..."<<endl;
                        mb.release();
                }

               
                else        //写回成功,再继续读下一组数据
                {
                        ACE_Message_Block *new_mb = NULL;
                        ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));

                        this->m_reader.read(*new_mb, new_mb->space());

                        cout<<"Read again."<<endl;
                }
        }
}

//写操作完成
void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
        cout<<"Write completed."<<endl;
        //释放消息
        result.message_block().release();
}

//超时
void ServerHander::handle_time_out(const ACE_Time_Value &tv,
                                                                        const void *p)
{
        time_t curTime = ACE_OS::time(NULL);

        if( curTime - m_lastTime > TIME_OUT )
        {
                cout<<"TimeOut"<<endl;
                delete this;
        }
}

//显示信息
void ServerHander::DisplayInfo(ACE_HANDLE h,
                                                           char* str) const
{
        //获取客户端连接地址和端口
        ACE_INET_Addr addr;
        ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
        size_t addr_size=1;
        ass.get_remote_addr(addr);

        cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
}

[ 本帖最后由 winston 于 2007-12-19 12:20 编辑 ]

顶部
[广告]
rosebush





UID 32
精华 0
积分 100
帖子 13
威望 100
金钱 100
ACEJOY 100
阅读权限
注册 2007-12-13
状态 离线
发表于 2007-12-19 11:30  资料  个人空间  短消息  加为好友 
我将echoserver的代码贴出供大家研究测试2

多线程的proactor部分

proactorTask.h
#ifndef _CPROACTOR_TASK_H_
#define _CPROACTOR_TASK_H_


#pragma once

#include "ace\Task_T.h"
#include "ace\Thread_Semaphore.h"
#include "ace\Proactor.h"
#include "ace\WIN32_Proactor.h"


class CProactorTask :public ACE_Task<ACE_MT_SYNCH>
{
public:
        CProactorTask(void);
        virtual ~CProactorTask(void);

        int Start(const int nMax);
        int Stop(void);
        int Create(void);
        int Release(void);
        virtual int svc(void);

protected:
        ACE_Thread_Semaphore        m_sem;                                //信号量
        ACE_Proactor*                        m_pProactor;                //完成端口对象指针
};


#endif
---------------------------------------------------------------------------------------------------
cpp部分

#include "StdAfx.h"
#include "ProactorTask.h"

CProactorTask::CProactorTask(void)
{
}

CProactorTask::~CProactorTask(void)
{
}


//
//创建完成端口对象
//
int CProactorTask::Create(void)
{
        ACE_WIN32_Proactor *proactor_impl = 0;

        //新建
        ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);

        //关联
        ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);

        //保存
        ACE_Proactor::instance(this->m_pProactor, 1);

        return 0;
}


//
//启动线程池
//
int CProactorTask::Start(const int nMax)        //线程数量
{
        //创建完成端口对象
        Create();

        //创建线程
        this->activate(THR_NEW_LWP, nMax);

        int i;
        //保证所有线程已启动
        for(i = nMax; i>0; i--)
        {
                m_sem.acquire();        //Block the thread until the semaphore count becomes greater than 0, then decrement it.
        }

        cout<<"Start."<<endl;
        return 0;
}


//
//删除线程池
//
int CProactorTask::Stop(void)
{
        ACE_Proactor::end_event_loop();
        this->wait();
        return 0;
}


//
//每个线程调用
//
int CProactorTask::svc(void)
{
        ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));

        //Increment the semaphore by 1
        m_sem.release(1);
        ACE_Proactor::run_event_loop();
        return 0;
}


//
//释放
//
int CProactorTask::Release(void)
{
        ACE_Proactor::close_singleton();
        m_pProactor = 0;

        cout<<"Release."<<endl;
        return 0;
}
--------------------------------------------------------------------------------------------------------

main.cpp部分

// EchoServer.cpp : Defines the entry point for the console application.
//
// 2007.12.6
// Echo Server Proactor模式


#include "Winbase.h"
#include "stdafx.h"

#include "ace/INET_Addr.h"
#include "ace/Asynch_Acceptor.h"
#include "ServerHander.h"
#include "ProactorTask.h"

int _tmain(int argc, _TCHAR* argv[])
{
        cout<<"******* Echo Server *******"<<endl<<endl;

        //获得CPU数量
        SYSTEM_INFO sysInfo;
        GetSystemInfo(&sysInfo);
        int threadNum = sysInfo.dwNumberOfProcessors<<1;        // CPU * 2

        //开启线程
        CProactorTask task;
        task.Start( threadNum );

        ACE_Asynch_Acceptor<ServerHander> MyAcceptor;

        ACE_INET_Addr addr(5050);

        if(MyAcceptor.open(addr) == -1)
        {
                cout<<"acceptor open failed..."<<endl;
                return 1;
        }

        cout<<"Listening on "<< addr.get_port_number() <<"..."<<endl;

        DWORD sleepTime = 0;

        while(1)
        {
                cin>>sleepTime;
                ServerHander::SetSleepTime(sleepTime);
                cout<<"********** Set sleep time to "<<sleepTime<<" ************"<<endl;
        }

        return 0;
}

[ 本帖最后由 winston 于 2007-12-19 12:21 编辑 ]

顶部
[广告]
rosebush





UID 32
精华 0
积分 100
帖子 13
威望 100
金钱 100
ACEJOY 100
阅读权限
注册 2007-12-13
状态 离线
发表于 2007-12-19 11:40  资料  个人空间  短消息  加为好友 
说明

忘了禁用表情了.....抱歉一下
程序只是测试用的,有许多问题,大家可以一起探讨
列出几个写代码中的疑点和自己想到的改进方向:

1.由于显示输出采用的是cout,没有同步处理,在多线程下可能会显示异常
2.TimeOut的处理,由于ACE的time是单独开的一个线程,所以,timer处理和Proactor其他事件可能并发,需要注意
3.资源申请和释放ACE_Message_Block是不停new,delete来着,可以考虑使用内存池,ACE提供了很多分配器,由于windows
下proactor使用的是完成端口,所以发起的完成事件越多,被锁定在物理内存中的页面也就越多,使用memory pool可以有效节省资源,当然对于echo这样的小服务来说还是太复杂了.
4.前面提到的runloader测试问题,在Hander的析构代码中如果调用了closesocket就会影响并发连接的效果,不知道为什么
5.accept的时候可以直接接受数据

顶部
[广告]
 



当前时区 GMT+8, 现在时间是 2010-2-9 10:26
京ICP备06055248号

    本论坛支付平台由支付宝提供
携手打造安全诚信的交易社区 Powered by Discuz! 5.5.0  © 2001-2007 Comsenz Inc.
Processed in 0.063434 second(s), 5 queries , Gzip enabled

清除 Cookies - 联系我们 - ACE Developer - Archiver