Board logo

标题: proactor windows下的线程模型问题 [打印本页]

作者: peakzhang    时间: 2007-12-18 23:55     标题: proactor windows下的线程模型问题

windows下实现是用i/o完成端口的,
我的问题是投递的多个异步读写(对同一个文件句柄)是否有可能被
系统同时执行?因为看到的资料都说i/o完成端口可以有多个线程同时
执行异步读写的。
如果可能被同时执行的话,那么操作系统(i/o完成端口)能保证同步吗?
作者: peakzhang    时间: 2007-12-18 23:56

当然可以,不过最好是在读完之前不要投递下一个读,在写完成之前不要投递下一个写,呵呵
作者: rosebush    时间: 2007-12-19 10:11

这个问题我也研究了很久
在多线程模式下使用proactor的主要做法就是在多个线程中调用Proactor的handle_event
而handle_event会阻塞在等待系统消息,如果是完成端口的话就是GetQueuedCompletionStatus这个函数
这样当有事件到达的时候GetQueuedCompletionStatus就会从阻塞状态返回,handle_event就可以继续完成
消息分派,从而回调对应事件的完成处理.从上面的分析可知道如果多线程调用GetQueuedCompletionStatus
当同时到达多个事件时就会发生并发处理,理论上所有的事件都是可能并发的,同一个事件的处理函数也是可
能重入的(多个线程同时执行同一个回调)
作者: winston    时间: 2007-12-19 10:17

确实如此。你说的没错。使用proactor的主要做法就是在多个线程中调用Proactor的处理循环。
一定要注意的是,处理器对象的各个虚方法,都是可能被多线程处理的,如果不注意,很可能导致处理错误。
作者: rosebush    时间: 2007-12-19 10:25

最近在做一个proxy需要将两个端口关联起来,比如A-B两个端口关联起来建立了一个channel,由于A和B之间需要互相调用Send,所以他们需要相互了解(相互拥有对方的指针或者句柄)那么当A关闭的时候应该通知B关闭,这样在资源释放上处理很困难,特别是在多线程的异步环境下,版主有没有什么好的建议啊
作者: winston    时间: 2007-12-19 10:37

我遇见过你说的情况,就是A-B之间需要接口相互使用。有几种办法可以处理的,仅供参考,如果你有更好的办法,请回帖告诉我。
1、参考ACE里面的智能指针,有强弱之分,能够在指针对象析构的时候自动改变状态。
2、使用引用计数,把A / B保护起来,注意得防止A / B正在执行的时候,被另外的线程处理删除,导致崩溃,这个问题几率很低,所以很多人意识不到,出了问题又很难想到和查到。
3、A和B都是单件,永不清除,只有状态变化的差别,比如是否能发送、收取等。
作者: rosebush    时间: 2007-12-19 10:45

智能指针是个不错的选择,但是智能指针是针对内存资源的,那么如果io资源出现并发,比如在A端口上发起了写之前,另一个线程关闭了A端口,会出现什么现象了?我用loadrunner测试用proactor写的echoserver,如果server不主动调用closesocket那么并发100个用户可以很稳定的跑起来,但是如果加上了closesocket,并发100个连接的时候,loadrunner就会在connect的时候超时退出.
作者: rosebush    时间: 2007-12-19 10:48

在JAWS2中,资源的释放是和一次处理请求绑定在一起的,从一个端口接受了一个http请求包放在JAWS_Message_Block类的对象中,这个对象贯穿了一次完整处理周期,只有当一次完整处理结束了,这个对象才会被释放,这个对象的释放也会触发端口资源的释放
作者: winston    时间: 2007-12-19 10:53

嗯。具体问题具体分析。
A / B在相互引用的情况下,非常容易出现“环”,出现环状调用很可能导致死锁发生。这时可以用设计模式的一些技巧,把环解开。
作者: rosebush    时间: 2007-12-19 11:07

设计模式解决环的问题一般是采用抽象一个接口层次出来,让相互直接引用的类改为引用对发的抽象接口,如果采用这个模型
那么接口和实现之间就需要一个很严格的规则来避免实体先于接口被释放,这个倒是可以参考COM的规则.

这样增加了复杂性,是否会引入更多的问题了?
作者: winston    时间: 2007-12-19 11:19

避免环肯定会引入复杂性的。这就要看是否值得,如果确保程序调用顺序,在任何情况下都不会死锁,我倒是觉得无所谓。
避免实体先于接口被释放,COM有实现,可以借鉴。记得应该就是引用计数规则吧。
作者: rosebush    时间: 2007-12-19 11:29     标题: 我将echoserver的代码贴出供大家研究测试

首先是 ServerHandler.h
  1. #ifndef _SERVER_HANDER_H_
  2. #define _SERVER_HANDER_H_

  3. #pragma once


  4. #include "ace/Proactor.h"
  5. #include "ace/Asynch_IO.h"
  6. #include "ace/message_block.h"

  7. class ServerHander :public ACE_Service_Handler
  8. {
  9. public:
  10.         ServerHander(void);
  11.         virtual ~ServerHander(void);
  12.         static void SetSleepTime(const DWORD t)
  13.         {
  14.                 m_sleepTime = t;
  15.         };

  16.         virtual void open(ACE_HANDLE h, ACE_Message_Block& _mb);

  17. protected:
  18.         virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result &result);

  19.         virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result);

  20.         virtual void  handle_time_out(const ACE_Time_Value &tv, const void *p);

  21.         void DisplayInfo(ACE_HANDLE h, char* str) const;


  22. private:
  23.         ACE_Asynch_Read_Stream        m_reader;
  24.         ACE_Asynch_Write_Stream m_writer;

  25.         static DWORD                        m_sleepTime;
  26.         time_t                                        m_lastTime;
  27. };


  28. #endif

  29. -----------------------------------------------------------------------------------------------------------------------------------
  30. ServerHander.cpp

  31. #include "StdAfx.h"
  32. #include "ServerHander.h"
  33. #include "ace/OS_NS_sys_socket.h"
  34. #include "ace/INET_Addr.h"
  35. #include "ace/SOCK_SEQPACK_Association.h"
  36. #include "ace/OS.h"

  37. #define TIME_OUT 10

  38. DWORD ServerHander::m_sleepTime = 0;

  39. ServerHander::ServerHander(void):m_lastTime(0)
  40. {
  41. }

  42. ServerHander::~ServerHander(void)
  43. {
  44.         //关闭
  45.         if (this->handle() != ace_invalid_handle)               
  46.         {
  47.                 //显示客户端连接地址和端口
  48.                 displayinfo(this->handle(), " disconnected.");

  49.                 ace_proactor::instance()->cancel_timer(*this,1);
  50.                 ace_os::shutdown(this->handle(), sd_both);
  51.                 ace_os::closesocket( this->handle() );
  52.                 this->handle(ace_invalid_handle);
  53.         }
  54. }

  55. //客户端连接
  56. void ServerHander::open(ACE_HANDLE h,
  57.                                                 ACE_Message_Block& _mb)
  58. {
  59.         this->handle(h);

  60.         //记录时间
  61.         m_lastTime = ACE_OS::time(NULL);

  62.         //ACE_Proactor::instance()->schedule_timer(*this, 0, ACE_Time_Value(0), ACE_Time_Value(TIME_OUT));

  63.         //构造I/O流
  64.         if( this->m_reader.open(*this) != 0 || this->m_writer.open(*this) != 0 )
  65.         {
  66.                 cout<<"m_reader or m_writer open failed..."<<endl;
  67.                 delete this;
  68.                 return;
  69.         }

  70.         //显示客户端连接地址和端口        
  71.         DisplayInfo(this->handle(), " connected.");

  72.         ACE_Message_Block* mb = NULL;
  73.         ACE_NEW_NORETURN(mb, ACE_Message_Block(1024));        

  74.         //发起读操作
  75.         if( this->m_reader.read( *mb, mb->space() ) != 0 )
  76.         {
  77.                 cout<<"m_reader read failed..."<<endl;
  78.                 mb->release();
  79.                 delete this;
  80.         }
  81. }

  82. //读操作完成
  83. void ServerHander::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
  84. {
  85.         //记录时间
  86.         m_lastTime = ACE_OS::time(NULL);

  87.         ACE_Message_Block &mb = result.message_block();

  88.         //传输不成功
  89.         if ( (!result.success()) || (result.bytes_transferred() == 0) )
  90.         {
  91.                 cout<<"Read failed..."<<endl;

  92.                 mb.release();
  93.                 delete this;  
  94.         }
  95.         else        //接收完成
  96.         {
  97.                 //等待 模拟过载导致的响应速度变慢
  98.                 Sleep( m_sleepTime );

  99.                 //写回
  100.                 //mb.wr_ptr(0);
  101.                 //mb.wr_ptr()[-2] = 0x03;
  102.                 if (this->m_writer.write( mb, mb.length() ) == -1)
  103.                 {
  104.                         cout<<"Server write failed..."<<endl;
  105.                         mb.release();
  106.                 }

  107.                
  108.                 else        //写回成功,再继续读下一组数据
  109.                 {
  110.                         ACE_Message_Block *new_mb = NULL;
  111.                         ACE_NEW_NORETURN(new_mb, ACE_Message_Block(1024));

  112.                         this->m_reader.read(*new_mb, new_mb->space());

  113.                         cout<<"Read again."<<endl;
  114.                 }
  115.         }
  116. }

  117. //写操作完成
  118. void ServerHander::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
  119. {
  120.         cout<<"Write completed."<<endl;
  121.         //释放消息
  122.         result.message_block().release();
  123. }

  124. //超时
  125. void ServerHander::handle_time_out(const ACE_Time_Value &tv,
  126.                                                                         const void *p)
  127. {
  128.         time_t curTime = ACE_OS::time(NULL);

  129.         if( curTime - m_lastTime > TIME_OUT )
  130.         {
  131.                 cout<<"TimeOut"<<endl;
  132.                 delete this;
  133.         }
  134. }

  135. //显示信息
  136. void ServerHander::DisplayInfo(ACE_HANDLE h,
  137.                                                            char* str) const
  138. {
  139.         //获取客户端连接地址和端口
  140.         ACE_INET_Addr addr;
  141.         ACE_SOCK_SEQPACK_Association ass = ACE_SOCK_SEQPACK_Association(h);
  142.         size_t addr_size=1;
  143.         ass.get_remote_addr(addr);

  144.         cout<< addr.get_host_addr() <<":"<< addr.get_port_number() <<str<<endl;
  145. }
复制代码

[ 本帖最后由 winston 于 2007-12-19 12:20 编辑 ]
作者: rosebush    时间: 2007-12-19 11:30     标题: 我将echoserver的代码贴出供大家研究测试2

多线程的proactor部分
  1. proactorTask.h
  2. #ifndef _CPROACTOR_TASK_H_
  3. #define _CPROACTOR_TASK_H_


  4. #pragma once

  5. #include "ace\Task_T.h"
  6. #include "ace\Thread_Semaphore.h"
  7. #include "ace\Proactor.h"
  8. #include "ace\WIN32_Proactor.h"


  9. class CProactorTask :public ACE_Task<ACE_MT_SYNCH>
  10. {
  11. public:
  12.         CProactorTask(void);
  13.         virtual ~CProactorTask(void);

  14.         int Start(const int nMax);
  15.         int Stop(void);
  16.         int Create(void);
  17.         int Release(void);
  18.         virtual int svc(void);

  19. protected:
  20.         ACE_Thread_Semaphore        m_sem;                                //信号量
  21.         ACE_Proactor*                        m_pProactor;                //完成端口对象指针
  22. };


  23. #endif
  24. ---------------------------------------------------------------------------------------------------
  25. cpp部分

  26. #include "StdAfx.h"
  27. #include "ProactorTask.h"

  28. CProactorTask::CProactorTask(void)
  29. {
  30. }

  31. CProactorTask::~CProactorTask(void)
  32. {
  33. }


  34. //
  35. //创建完成端口对象
  36. //
  37. int CProactorTask::Create(void)
  38. {
  39.         ACE_WIN32_Proactor *proactor_impl = 0;

  40.         //新建
  41.         ACE_NEW_RETURN(proactor_impl, ACE_WIN32_Proactor, -1);

  42.         //关联
  43.         ACE_NEW_RETURN(this->m_pProactor, ACE_Proactor(proactor_impl, 1 ), -1);

  44.         //保存
  45.         ACE_Proactor::instance(this->m_pProactor, 1);

  46.         return 0;
  47. }


  48. //
  49. //启动线程池
  50. //
  51. int CProactorTask::Start(const int nMax)        //线程数量
  52. {
  53.         //创建完成端口对象
  54.         Create();

  55.         //创建线程
  56.         this->activate(THR_NEW_LWP, nMax);

  57.         int i;
  58.         //保证所有线程已启动
  59.         for(i = nMax; i>0; i--)
  60.         {
  61.                 m_sem.acquire();        //Block the thread until the semaphore count becomes greater than 0, then decrement it.
  62.         }

  63.         cout<<"Start."<<endl;
  64.         return 0;
  65. }


  66. //
  67. //删除线程池
  68. //
  69. int CProactorTask::Stop(void)
  70. {
  71.         ACE_Proactor::end_event_loop();
  72.         this->wait();
  73.         return 0;
  74. }


  75. //
  76. //每个线程调用
  77. //
  78. int CProactorTask::svc(void)
  79. {
  80.         ACE_DEBUG((LM_INFO,ACE_TEXT("svc函数调用!\n")));

  81.         //Increment the semaphore by 1
  82.         m_sem.release(1);
  83.         ACE_Proactor::run_event_loop();
  84.         return 0;
  85. }


  86. //
  87. //释放
  88. //
  89. int CProactorTask::Release(void)
  90. {
  91.         ACE_Proactor::close_singleton();
  92.         m_pProactor = 0;

  93.         cout<<"Release."<<endl;
  94.         return 0;
  95. }
  96. --------------------------------------------------------------------------------------------------------

  97. main.cpp部分

  98. // EchoServer.cpp : Defines the entry point for the console application.
  99. //
  100. // 2007.12.6
  101. // Echo Server Proactor模式


  102. #include "Winbase.h"
  103. #include "stdafx.h"

  104. #include "ace/INET_Addr.h"
  105. #include "ace/Asynch_Acceptor.h"
  106. #include "ServerHander.h"
  107. #include "ProactorTask.h"

  108. int _tmain(int argc, _TCHAR* argv[])
  109. {
  110.         cout<<"******* Echo Server *******"<<endl<<endl;

  111.         //获得CPU数量
  112.         SYSTEM_INFO sysInfo;
  113.         GetSystemInfo(&sysInfo);
  114.         int threadNum = sysInfo.dwNumberOfProcessors<<1;        // CPU * 2

  115.         //开启线程
  116.         CProactorTask task;
  117.         task.Start( threadNum );

  118.         ACE_Asynch_Acceptor<ServerHander> MyAcceptor;

  119.         ACE_INET_Addr addr(5050);

  120.         if(MyAcceptor.open(addr) == -1)
  121.         {
  122.                 cout<<"acceptor open failed..."<<endl;
  123.                 return 1;
  124.         }

  125.         cout<<"Listening on "<< addr.get_port_number() <<"..."<<endl;

  126.         DWORD sleepTime = 0;

  127.         while(1)
  128.         {
  129.                 cin>>sleepTime;
  130.                 ServerHander::SetSleepTime(sleepTime);
  131.                 cout<<"********** Set sleep time to "<<sleepTime<<" ************"<<endl;
  132.         }

  133.         return 0;
  134. }
复制代码

[ 本帖最后由 winston 于 2007-12-19 12:21 编辑 ]
作者: rosebush    时间: 2007-12-19 11:40     标题: 说明

忘了禁用表情了.....抱歉一下
程序只是测试用的,有许多问题,大家可以一起探讨
列出几个写代码中的疑点和自己想到的改进方向:

1.由于显示输出采用的是cout,没有同步处理,在多线程下可能会显示异常
2.TimeOut的处理,由于ACE的time是单独开的一个线程,所以,timer处理和Proactor其他事件可能并发,需要注意
3.资源申请和释放ACE_Message_Block是不停new,delete来着,可以考虑使用内存池,ACE提供了很多分配器,由于windows
下proactor使用的是完成端口,所以发起的完成事件越多,被锁定在物理内存中的页面也就越多,使用memory pool可以有效节省资源,当然对于echo这样的小服务来说还是太复杂了.
4.前面提到的runloader测试问题,在Hander的析构代码中如果调用了closesocket就会影响并发连接的效果,不知道为什么
5.accept的时候可以直接接受数据




欢迎光临 ACE开发者 (http://www.acejoy.com/bbs/) Powered by Discuz! 7.2