VC驿站

 找回密码
 加入驿站

QQ登录

只需一步,快速开始

搜索
查看: 6941|回复: 6

[原创] IOCP完成端口---带注释-

[复制链接]
密码错误 发表于 2016-5-29 21:35:00 | 显示全部楼层 |阅读模式
今天找到以前的一个U盘,里面有很多以前写的代码,分享以前学习IOCP完成端口写的源码,都是参照学习; 以前发了一个贴是事件选择模型写的完成端口,现在改为 AcceptEx,GetAcceptExSockAddrs异步投递模型




////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 头文件
if !defined(AFX_SOCKETS_H__CD08A6AB_955F_45F0_87DD_D0740503779A__INCLUDED_)
#define AFX_SOCKETS_H__CD08A6AB_955F_45F0_87DD_D0740503779A__INCLUDED_

#include <winsock2.h>
#pragma  comment(lib,"ws2_32.lib")
#include <mswsock.h>
#pragma comment(lib,"mswsock.lib")
#include <MSTcpIP.h>

//============================================================
#include <string>
#include <list>
using namespace std;

namespace Sockets{


        //缓冲器类
        class Buffer
        {
        public:
                Buffer(void);
                virtual ~Buffer(void);
                /*函数说明:
                功能 : 清空缓冲区
                参数 :
                返回值:
                时间 :2014/01/26*/
                void ClearBuffer();
                /*函数说明:
                功能 : 删除缓冲区数据
                参数 :
                1.nSize : 删除的长度
                返回值:
                返回删除后的数据长度
                时间 :2014/01/26*/
                UINT Delete(UINT nSize);
                /*函数说明:
                功能 : 读字节数据
                参数 :
                1.pData : 读出的缓冲区
                2.nSize :读出数据长度
                返回值:
                返回读出数据长度
                时间 :2014/01/26*/
                UINT Read(PBYTE pData, UINT nSize);
                /*函数说明:
                功能 : 写入字节数据
                参数 :
                1.pData : 写入的数据
                2.nSize :数据长度
                返回值:
                成功则返回TRUE,否则返回FALSE.
                时间 :2014/01/26*/
                BOOL Write(PBYTE pData, UINT nSize);
                /*函数说明:
                功能 : 写入字符数据
                参数 :
                1.pData : 写入的数据
                2.nSize :数据长度
                返回值:
                成功则返回TRUE,否则返回FALSE.
                时间 :2014/01/26*/
                BOOL Write(std::string& strData);
                /*函数说明:
                功能 : 获取缓冲区数据长度
                参数 :
                返回值:
                成功则返回数据长度.
                时间 :2014/01/26*/
                UINT GetBufferLen();
                //插入字节数据
                BOOL Insert(PBYTE pData, UINT nSize);
                //插入字符数据
                BOOL Insert(std::string& strData);
                //拷贝数据
                void Copy(Buffer& buffer);
                //获取数据
                PBYTE GetBuffer(UINT nPos = 0);
        protected:

                PBYTE        m_pBase;          //基地址
                PBYTE        m_pPtr;     //偏移地址
                UINT        m_nSize;    //长度
                //内部方法
        protected:
                //重新分配
                UINT ReAllocateBuffer(UINT nRequestedSize);
                //解除分配
                UINT DeAllocateBuffer(UINT nRequestedSize);
                //获取内存大小
                UINT GetMemSize();

        };




        // 传递给Worker线程的退出信号
#define EXIT_CODE                    -1       
        //        //        //缓冲区长度 (1024*8)
#define MAX_BUFFER_LEN        1024*4  
        //标识
#define FLAG_SIZE         3
        // 'P' 'R' 'C' | Len 3+4
#define HDR_SIZE        7


        ///////////////////////////////////////////////////////////////////////////
        //初始化套接字库
        inline BOOL IntSocket(void)
        {
                WSACleanup();
                WSADATA wsaData;
                return WSAStartup(MAKEWORD(2, 2), &wsaData) == 0;
        }

        //创建套接字
        inline        SOCKET        CreateSocket(void)
        {
                return WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
        }
        //监听套接字
        inline        BOOL ListenSocket(SOCKET s)
        {
                return listen(s, SOMAXCONN) != SOCKET_ERROR;
        }

        //绑定套接字
        inline        BOOL BindSocket(SOCKET s, UINT nPort)
        {
               

                // 获得本机主机名
                char hostname[MAX_PATH] = { 0 };

                gethostname(hostname, MAX_PATH);
                struct hostent FAR* lpHostEnt = gethostbyname(hostname);
                if (lpHostEnt == NULL){
                        return FALSE;
                }
                // 取得IP地址列表中的第一个为返回的IP(因为一台主机可能会绑定多个IP)
                LPSTR lpAddr = lpHostEnt->h_addr_list[0];
                // 将IP地址转化成字符串形式
                struct in_addr inAddr;
                memmove(&inAddr, lpAddr, 4);
                string m_strIP = string(inet_ntoa(inAddr));



                //        //网络结构
                SOCKADDR_IN                saServer;
                //        // 监听端口
                saServer.sin_port = htons(nPort);//htons将主机的无符号短整形数转换成网络字节顺序
                //        // 监听地址
                saServer.sin_family = AF_INET;//指代协议族,在socket编程中只能是AF_INET
                saServer.sin_addr.s_addr = inet_addr(m_strIP.c_str());// INADDR_ANY;//存储IP地址  INADDR_ANY就是指定地址为0.0.0.0的地址,
                return bind(s, (LPSOCKADDR)&saServer, sizeof(struct sockaddr)) != SOCKET_ERROR;
        }

        //绑定关联完成端口
        inline        BOOL AssociateSocketWithCompletionPort(SOCKET s, HANDLE hCompletionPort, DWORD dwCompletionKey)
        {
                HANDLE h = CreateIoCompletionPort((HANDLE)s, hCompletionPort, dwCompletionKey, 0);
                return h == hCompletionPort;
        }
        //////////////////////////////////////////////////////////////////////////////////




        namespace Server{

               

                // 在完成端口上投递的I/O操作的类型
                typedef enum _OPERATION_TYPE  {

                        IO_Accept = 101,     // 用于初始化,无意义
                        IO_Read,             // 标志投递的是接收操作      
                        IO_Write,          // 标志投递的是发送操作
                        IO_NULL = -1
                }OPERATION_TYPE;

       

                typedef struct PER_IO_OPERATEION_DATA
                {
                        OVERLAPPED m_ol;
                        OPERATION_TYPE                m_ioType;  //IO操作类型
                        //===========================================
                        SOCKET pAcceptsocket;
                        char  *IP; //客户Ip
                        UINT        port; //网络协议结构

                        WSABUF m_wsaBuf;         //接收数据缓冲区
                        BYTE m_szBuffer[MAX_BUFFER_LEN]; //接收数据缓冲区
                        Buffer m_RecvBuffer;  /////接收数据缓冲器
                        //=============================
                        Buffer m_WriteBuffer;  /////发送缓冲器
                        HANDLE                m_hWriteComplete;
                        //发送数据
                        WSABUF         m_wsaOutBuffer;
                        PER_IO_OPERATEION_DATA()
                        {
                                ZeroMemory(&m_ol, sizeof(OVERLAPPED)); //清空数据
                                pAcceptsocket = INVALID_SOCKET;
                                m_wsaBuf.buf = (char*)m_szBuffer;
                                m_wsaBuf.len = sizeof(m_szBuffer);
                                m_ioType = IO_NULL;
                        }
                        ~PER_IO_OPERATEION_DATA()
                        {
                                closesocket(pAcceptsocket);
                        }

                }IO_OPERATEION_DATA;


                //回调函数地址指针
                typedef void (CALLBACK* NOTIFYPROC)(SOCKET Accept, int type, char *buf, int len);



                //Tcp服务器类
                class TcpServer
                {

                public:
                        TcpServer(void); //构造函数
                        virtual~TcpServer(void); //析构函数
                        BOOL Start(UINT nPort, NOTIFYPROC pNotifyProc=NULL);        // 启动服务器
                        //功能 : 关闭服务器
                        void Stop(void);
                        //功能 : 获取服务器状态
                        BOOL IsRunning(){ return m_bIsRunning; };
                        //功能 : 向客户端发送数据
                        BOOL Send(SOCKET s, LPBYTE lpData, UINT nSize);
                protected:
                        BOOL PostAccept(void);
                        void OnAccept(IO_OPERATEION_DATA* pIoContext);
                        //////向完成端口投递接收客户连接请求
                        BOOL  PostRecv(IO_OPERATEION_DATA* pIoContext);
                        //删除客户
                        void RemoveStaleClient(IO_OPERATEION_DATA* pIoContext);
                        void OnClientReading(IO_OPERATEION_DATA* pIoContext, DWORD dwSize = 0);
                        void OnClientWriting(IO_OPERATEION_DATA* pIoContext, DWORD dwSize = 0);

                private:
                        //功能 : 工作线程,负责完成端口IO操作
                        static unsigned __stdcall WorkerThread(LPVOID lpvoid);
                       
                        /*变量说明:
                        功能 :  线程临界区结构对象*/
                         CRITICAL_SECTION m_cs;
                        /*变量说明:
                        功能 :  回调函数地址指针*/
                        NOTIFYPROC                m_pNotifyProc;
                        /*变量说明:
                        功能 : 完成端口句柄*/
                        HANDLE m_hIOCompletionPort;
                        /*变量说明:
                        功能 : 工作者线程的句柄指针*/
                        HANDLE* m_phWorkerThreads;
                        /*变量说明:
                        功能 : 生成的工作线程数量*/
                        short m_ThreadNumber;
                        /*变量说明:
                        功能 :套接字句柄*/
                        SOCKET m_socListen;
                        //服务器状态
                        BOOL         m_bIsRunning;
                        /*变量说明:
                        功能 :  心跳包*/
                        char chOpt;
                        list<IO_OPERATEION_DATA*>  m_arrayClientContext;          // 客户端Socket信息      
                        //包头标示
                        BYTE                m_bPacketFlag[FLAG_SIZE];

                        LPFN_ACCEPTEX                m_lpfnAcceptEx;                // AcceptEx的函数指针
                        LPFN_GETACCEPTEXSOCKADDRS    m_lpfnGetAcceptExSockAddrs;  //GetAcceptExSockaddrs 的函数指针

                       
                };
        }
}



#endif // _MSC_VER > 1000

////////////////////////////////////////////////////////////////////////////////////////////////////////源文件

#include "stdafx.h"
#include "Sockets.h"
#include <process.h>
#include "..\\zlib\\zlib.h"
#pragma comment(lib,"..\\zlib\\zlib.lib")

namespace Sockets{

        //构造函数
        Buffer::Buffer(void)
        {
                m_nSize = 0;
                m_pPtr = m_pBase = NULL;
        }
        //析构函数
        Buffer::~Buffer(void)
        {
                if (m_pBase)
                        VirtualFree(m_pBase, 0, MEM_RELEASE);                //释放内存
        }

        //写数据到缓冲区
        BOOL Buffer::Write(PBYTE pData, UINT nSize)
        {
                //分配内存
                ReAllocateBuffer(nSize + GetBufferLen());

                CopyMemory(m_pPtr, pData, nSize);

                // Advance Pointer
                m_pPtr += nSize;

                return nSize;
        }

        //插入数据到缓冲区中
        BOOL  Buffer::Insert(PBYTE pData, UINT nSize)
        {
                ReAllocateBuffer(nSize + GetBufferLen());

                MoveMemory(m_pBase + nSize, m_pBase, GetMemSize() - nSize);
                CopyMemory(m_pBase, pData, nSize);

                // Advance Pointer
                m_pPtr += nSize;

                return nSize;
        }
        //从缓冲区中读取数据和删除它读什么
        UINT Buffer::Read(PBYTE pData, UINT nSize)
        {
                // Trying to byte off more than ya can chew - eh?
                if (nSize > GetMemSize())
                        return 0;

                // all that we have
                if (nSize > GetBufferLen())
                        nSize = GetBufferLen();


                if (nSize)
                {
                        // Copy over required amount and its not up to us
                        // to terminate the buffer - got that!!!
                        CopyMemory(pData, m_pBase, nSize);

                        // Slide the buffer back - like sinking the data
                        MoveMemory(m_pBase, m_pBase + nSize, GetMemSize() - nSize);

                        m_pPtr -= nSize;
                }

                DeAllocateBuffer(GetBufferLen());

                return nSize;
        }
        ///返回phyical分配的内存缓冲区
        UINT Buffer::GetMemSize()
        {
                return m_nSize;
        }
        //缓冲区的数据长度
        UINT Buffer::GetBufferLen()
        {
                if (m_pBase == NULL)
                        return 0;

                int nSize =
                        m_pPtr - m_pBase;                                  //现有的长度
                return nSize;
        }

        //重新分配缓冲区
        UINT  Buffer::ReAllocateBuffer(UINT nRequestedSize)
        {
                if (nRequestedSize < GetMemSize())
                        return 0;

                // Allocate new size
                UINT nNewSize = (UINT)ceil(nRequestedSize / 1024.0) * 1024;

                // New Copy Data Over
                PBYTE pNewBuffer = (PBYTE)VirtualAlloc(NULL, nNewSize, MEM_COMMIT, PAGE_READWRITE);

                UINT nBufferLen = GetBufferLen();
                CopyMemory(pNewBuffer, m_pBase, nBufferLen);

                if (m_pBase)
                        VirtualFree(m_pBase, 0, MEM_RELEASE);


                // Hand over the pointer
                m_pBase = pNewBuffer;

                // Realign position pointer
                m_pPtr = m_pBase + nBufferLen;

                m_nSize = nNewSize;

                return m_nSize;
        }
        //解除分配
        UINT  Buffer::DeAllocateBuffer(UINT nRequestedSize)
        {
                if (nRequestedSize < GetBufferLen())
                        return 0;

                // Allocate new size
                UINT nNewSize = (UINT)ceil(nRequestedSize / 1024.0) * 1024;

                if (nNewSize < GetMemSize())
                        return 0;

                // New Copy Data Over
                PBYTE pNewBuffer = (PBYTE)VirtualAlloc(NULL, nNewSize, MEM_COMMIT, PAGE_READWRITE);

                UINT nBufferLen = GetBufferLen();
                CopyMemory(pNewBuffer, m_pBase, nBufferLen);

                VirtualFree(m_pBase, 0, MEM_RELEASE);

                // Hand over the pointer
                m_pBase = pNewBuffer;

                // Realign position pointer
                m_pPtr = m_pBase + nBufferLen;

                m_nSize = nNewSize;

                return m_nSize;
        }
        //清除/重置缓冲
        void  Buffer::ClearBuffer()
        {
                // Force the buffer to be empty
                m_pPtr = m_pBase;

                DeAllocateBuffer(1024);
        }
        //写一个字符串缓冲区的末尾
        BOOL  Buffer::Write(std::string& strData)
        {
                int nSize = strData.size();
                return Write((PBYTE)strData.c_str(), nSize);



        }
        //插入一个字符串缓冲区的开始
        BOOL  Buffer::Insert(std::string& strData)
        {
                int nSize = strData.size();
                return Insert((PBYTE)strData.c_str(), nSize);
        }
        //从一个缓冲对象复制到另一个地方
        void  Buffer::Copy(Buffer& buffer)
        {
                int nReSize = buffer.GetMemSize();
                int nSize = buffer.GetBufferLen();
                ClearBuffer();
                ReAllocateBuffer(nReSize);
                m_pPtr = m_pBase + nSize;
                CopyMemory(m_pBase, buffer.GetBuffer(), buffer.GetBufferLen());
        }
        //返回一个指向抵消由物理内存的指针
        PBYTE  Buffer::GetBuffer(UINT nPos)
        {
                return m_pBase + nPos;
        }
        //从缓冲区中删除数据,并删除它
        UINT  Buffer::Delete(UINT nSize)
        {
                // Trying to byte off more than ya can chew - eh?
                if (nSize > GetMemSize())
                        return 0;

                // all that we have
                if (nSize > GetBufferLen())
                        nSize = GetBufferLen();


                if (nSize)
                {
                        // Slide the buffer back - like sinking the data
                        MoveMemory(m_pBase, m_pBase + nSize, GetMemSize() - nSize);

                        m_pPtr -= nSize;
                }

                DeAllocateBuffer(GetBufferLen());

                return nSize;
        }

        namespace Server{

                //监听线程
                unsigned TcpServer::WorkerThread(LPVOID lpvoid)
                {
                        //指针转换
                        TcpServer* pThis = reinterpret_cast<TcpServer*>(lpvoid);
                        printf("WorkerThread:%d->启动\n", GetCurrentThreadId());
                        //接收字节长度
                        DWORD dwIoSize = 0;
                        //完成端口完成键数据
                        ULONG_PTR *PerHandleKey;
                        //IO_OPERATION_DATA* pSocketContext = NULL;
                        //IO重叠输入/输出结构
                        LPOVERLAPPED lpOverlapped = {};

                        //循环处理请求,直到接收到Shutdown信息为止
                        while (pThis->m_bIsRunning){


                                //获取完成端口IO状态
                                BOOL bIORet = GetQueuedCompletionStatus(pThis->m_hIOCompletionPort,//完成端口句柄
                                        &dwIoSize,//数据长度
                                        (PULONG_PTR)&PerHandleKey,//扩展IOIO重叠结构
                                        &lpOverlapped,////IO重叠输入/输出结构
                                        INFINITE);
                                //退出信号到达,退出线程
                                //接收EXIT_CODE退出标志,则直接退出
                                if ((DWORD)PerHandleKey == EXIT_CODE)
                                {
                                        break;
                                }
                                //CONTAINING_RECORD根据结构体中的某成员的地址来推算出该结构体整体的地址
                                IO_OPERATEION_DATA         *pOverlapped = CONTAINING_RECORD(lpOverlapped, IO_OPERATEION_DATA, m_ol);
                       
                                //客户端退出
                                if (dwIoSize == 0 && pOverlapped->m_ioType != IO_Write && pOverlapped->m_ioType != IO_Accept)
                                {
                                        //删除客户
                                        pThis->RemoveStaleClient(pOverlapped);

                                        continue;//到循环首
                                }
                                else{

                                        switch (pOverlapped->m_ioType)
                                        {
                       
                                        case IO_Accept:

                                                pThis->OnAccept(pOverlapped);//客户连接
                                                break;
                                        case IO_Read:

                                                pThis->OnClientReading(pOverlapped, dwIoSize);//接收数据

                                                break;

                                        case IO_Write:

                                                pThis->OnClientWriting(pOverlapped, dwIoSize); //发送数据
               
                                                break;

                                        }

                                }
                               
                        }
                        return 0;
                }


                void TcpServer::OnAccept(IO_OPERATEION_DATA* pIoContext)
                {
                        EnterCriticalSection(&this->m_cs);

                        SOCKADDR_IN *LocalSockaddr = NULL, //本地
                                *RemoteSockaddr = NULL;  //远程地址
                        int         LocalSockaddrLen,
                                RemoteSockaddrLen;

                        //获取远程客户端信息
                        this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, 0, sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (SOCKADDR **)&LocalSockaddr, &LocalSockaddrLen, (SOCKADDR **)&RemoteSockaddr, &RemoteSockaddrLen);

                        pIoContext->IP = inet_ntoa(RemoteSockaddr->sin_addr);
                        pIoContext->port = ntohs(RemoteSockaddr->sin_port);


                        printf("客户连接%s:%d\n", pIoContext->IP, pIoContext->port);

                        //绑定完成端口
                        if (!AssociateSocketWithCompletionPort(pIoContext->pAcceptsocket, m_hIOCompletionPort, (DWORD)pIoContext))
                        {
                                //关联失败 释放内存
                                delete pIoContext;
                                //返回不进行下步操作
                                return;
                        }
                        chOpt = 1;
                        //setsockopt设置套接字状态
                        if (setsockopt(pIoContext->pAcceptsocket, SOL_SOCKET, SO_KEEPALIVE, (char *)&chOpt, sizeof(chOpt)) == SOCKET_ERROR)
                        {
                                //关联失败 释放内存
                                delete pIoContext;
                                //返回不进行下步操作
                                return;
                        }
                        // Set KeepAlive 开启保活机制
                        // 设置超时详细信息
                        tcp_keepalive        klive;
                        klive.onoff = 1; // 启用保活
                        klive.keepalivetime = 1000 * 60 * 3;
                        klive.keepaliveinterval = 1000 * 10; // 重试间隔为10秒 Resend if No-Reply

                        if (WSAIoctl
                                (
                                pIoContext->pAcceptsocket,                          //套接口的句柄
                                SIO_KEEPALIVE_VALS,                           //将进行的操作的控制代码
                                &klive,
                                sizeof(tcp_keepalive),
                                NULL,
                                0,
                                (unsigned long *)&chOpt,
                                0,
                                NULL
                                ) == SOCKET_ERROR)
                        {
                                //关联失败 释放内存
                                delete pIoContext;
                                //返回不进行下步操作
                                return;
                        }
                        //投诉接收数据请求
                        if (this->PostRecv(pIoContext))
                        {
                       

                                this->PostAccept();//投递下一个客户连接请求
                       
                                //char lpData[] = "客户端收到数据";

                        //        this->Send(pIoContext->pAcceptsocket, (LPBYTE)lpData, sizeof(lpData));
                        }
                        else{
                                delete          pIoContext;
                        }


                        LeaveCriticalSection(&this->m_cs);
                       
                }
//投递接收数据请求
                BOOL TcpServer::PostRecv(IO_OPERATEION_DATA* pIoContext)
                {
                        //标志位的指针
                        ULONG                        ulFlags = MSG_PARTIAL;
                        DWORD                        dwNumberOfBytesRecvd;
                        pIoContext->m_ioType = IO_Read;


                        ZeroMemory(&pIoContext->m_ol, sizeof(pIoContext->m_ol)); //清空数据

                        //接收数据
                        UINT nRetVal = WSARecv(pIoContext->pAcceptsocket, //套接字
                                &pIoContext->m_wsaBuf,//数据缓冲区
                                1,//数组中WSABUF结构的数量
                                &dwNumberOfBytesRecvd, ///// 如果接收操作立即完成,这里会返回函数调用所接收到的字节数
                                &ulFlags,//一个指向标志位的指针
                                &pIoContext->m_ol, //绑定”的重叠结构
                                NULL);//完成例程中将会用到的参数,在异步操作中,本参数可设置为NULL。非重叠操作(或非异步操作)忽略
                        //// 如果返回值错误,并且错误的代码并非是Pending的话,那就说明这个重叠请求失败了

                        if ((nRetVal == SOCKET_ERROR) && (WSAGetLastError() != WSA_IO_PENDING))
                        {
                                return FALSE;
                        }
                        return TRUE;

                }
        //删除客户端
                void TcpServer::RemoveStaleClient(IO_OPERATEION_DATA* pIoContext)
                {

                        //遍历
                        list<IO_OPERATEION_DATA*>::iterator it = m_arrayClientContext.begin();
                        while (it != m_arrayClientContext.end())
                        {
                                IO_OPERATEION_DATA* p_obj = *it;
                                if (pIoContext == p_obj)
                                {
                                        printf("客户:%s断开-->\n", pIoContext->IP);
                                        m_arrayClientContext.erase(it);
                                        delete pIoContext;
                                        break;
                                }
                                it++;
                        }
                }

                void TcpServer::OnClientReading(IO_OPERATEION_DATA* pIoContext, DWORD dwSize /* = 0 */)
                {
                        EnterCriticalSection(&m_cs);
                        //写入数据
                        pIoContext->m_RecvBuffer.Write(pIoContext->m_szBuffer, dwSize);

                //        printf("%s\n", (char*)pIoContext->m_RecvBuffer.GetBuffer());

                        //判断数据长度
                        if (pIoContext->m_RecvBuffer.GetBufferLen() > HDR_SIZE)
                        {
                                BYTE PacketFlag[FLAG_SIZE];
                                //复制包头
                                CopyMemory(PacketFlag, pIoContext->m_RecvBuffer.GetBuffer(), sizeof(PacketFlag));
                                //判断包头
                                if (memcmp(m_bPacketFlag, PacketFlag, sizeof(m_bPacketFlag)) != 0)
                                {
                                        printf("收到不合法包!\n");
                                        goto number0; //到尾部
                                }
                                UINT nSize = 0;
                                //复制数据长度
                                CopyMemory(&nSize, pIoContext->m_RecvBuffer.GetBuffer(FLAG_SIZE), sizeof(int));

                                //判断数据接受满
                                if (nSize && (pIoContext->m_RecvBuffer.GetBufferLen()) >= nSize)
                                {
                                        int nUnCompressLength = 0;
                                        //取包头
                                        pIoContext->m_RecvBuffer.Read((PBYTE)PacketFlag, sizeof(PacketFlag));
                                        //取未压缩长度
                                        pIoContext->m_RecvBuffer.Read((PBYTE)&nSize, sizeof(int));
                                        //取压缩后长度
                                        pIoContext->m_RecvBuffer.Read((PBYTE)&nUnCompressLength, sizeof(int));
                                        //减去包头长度
                                        int        nCompressLength = nSize - HDR_SIZE;
                                        //申请内存
                                        PBYTE pData = new BYTE[nCompressLength];
                                        //申请解压数据内存
                                        PBYTE pDeCompressionData = new BYTE[nUnCompressLength];
                                        if (pData == NULL || pDeCompressionData == NULL)
                                                return;
                                        //读取数据
                                        pIoContext->m_RecvBuffer.Read(pData, nCompressLength);

                                        unsigned long        destLen = nUnCompressLength;
                                        //解压数据
                                        int        nRet = uncompress(pDeCompressionData, &destLen, pData, nCompressLength);
                                        //解压成功
                                        if (nRet == Z_OK)
                                        {

                                                if (m_pNotifyProc != NULL)
                                                {
                                                        //调用回调函数
                                                        m_pNotifyProc(pIoContext->pAcceptsocket, 102, (char*)pDeCompressionData, destLen);
                                                }

                                        }
                                        //释放内存
                                        pIoContext->m_RecvBuffer.ClearBuffer();
                                        delete[] pData;
                                        delete[] pDeCompressionData;
                                }
                        }
       
                number0:

                        this->PostRecv(pIoContext);//投递下一接收数据请求

                        pIoContext->m_RecvBuffer.ClearBuffer();
                        LeaveCriticalSection(&m_cs);
                }
                void TcpServer::OnClientWriting(IO_OPERATEION_DATA* pIoContext, DWORD dwSize /* = 0 */)
                {
                EnterCriticalSection(&m_cs);

                        ULONG ulFlags = MSG_PARTIAL;

                        pIoContext->m_WriteBuffer.Delete(dwSize);
                        if (pIoContext->m_WriteBuffer.GetBufferLen() == 0)
                        {
                                pIoContext->m_ioType = IO_NULL;
                                pIoContext->m_WriteBuffer.ClearBuffer();
                                SetEvent(pIoContext->m_hWriteComplete);
                       
                        }
                        else{

                                ZeroMemory(&pIoContext->m_ol, sizeof(OVERLAPPED)); //清空数据
                                pIoContext->m_ioType = IO_Write;
                                pIoContext->m_wsaOutBuffer.buf = (char*)pIoContext->m_WriteBuffer.GetBuffer();
                                pIoContext->m_wsaOutBuffer.len = pIoContext->m_WriteBuffer.GetBufferLen();

                                int nRetVal = WSASend(pIoContext->pAcceptsocket,
                                        &pIoContext->m_wsaOutBuffer,
                                        1,
                                        &pIoContext->m_wsaOutBuffer.len,
                                        ulFlags,
                                        &pIoContext->m_ol,
                                        NULL);

                                if (nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
                                        this->RemoveStaleClient(pIoContext);
                               

                                printf("数据发送!\n");
                        }

                        LeaveCriticalSection(&m_cs);
                }
                BOOL TcpServer::Send(SOCKET s, LPBYTE lpData, UINT nSize)
                {
                        if (m_bIsRunning == FALSE)
                                return FALSE;

                        list<IO_OPERATEION_DATA*>::iterator it = m_arrayClientContext.begin();
                        IO_OPERATEION_DATA *p = NULL;
                        while (it != m_arrayClientContext.end())
                        {
                                IO_OPERATEION_DATA* p_obj = *it;

                                if (s == p_obj->pAcceptsocket)
                                {
                                        p = p_obj;
                                        break;
                                }
                                it++;

                        }
                        if (p == NULL)
                                return FALSE;
               

                        if (nSize > 0)
                        {

                                // Compress data                //压缩率
                                unsigned long        destLen = (double)nSize * 1.001 + 12;
                                LPBYTE                        pDest = new BYTE[destLen];
                                int        nRet = compress(pDest, &destLen, lpData, nSize);

                                if (nRet != Z_OK)
                                {
                                        delete[] pDest;
                                        return FALSE;
                                }
                                //////////////////////////////////////////////////////////////////////////
                                LONG nBufLen = destLen + HDR_SIZE;
                                // 3 bytes 包头
                                p->m_WriteBuffer.Write(m_bPacketFlag, sizeof(m_bPacketFlag));
                                // 4 byte 压缩后长度
                                p->m_WriteBuffer.Write((PBYTE)&nBufLen, sizeof(nBufLen));
                                // 4 byte 未压缩前长度
                                p->m_WriteBuffer.Write((PBYTE)&nSize, sizeof(nSize));
                                // Write Data        写入正式数据
                                p->m_WriteBuffer.Write(lpData, nSize);
                                ////释放内存
                                delete[] pDest;
                                //         等待数据就绪信号可用
                                WaitForSingleObject(p->m_hWriteComplete, INFINITE);


                                p->m_ioType = IO_Write;//IO操作类型---发送数据

                                ZeroMemory(&p->m_ol, sizeof(OVERLAPPED)); //清空数据
                                //投递IO请求
                                return PostQueuedCompletionStatus(this->m_hIOCompletionPort, 0, (DWORD)p, &p->m_ol);
                        }
               

                        return FALSE;

                }


                //构造函数
                TcpServer::TcpServer(void)
                {
                        m_pNotifyProc = NULL;
                        m_bIsRunning = FALSE;
                        m_socListen = INVALID_SOCKET;
                        m_hIOCompletionPort = NULL;
                        m_phWorkerThreads = NULL;
                        m_ThreadNumber = 0;
                        ZeroMemory(&m_cs, sizeof(m_cs));
                       
                        BYTE bPacketFlag[] = { 'P', 'R', 'C' };
                        memcpy(m_bPacketFlag, bPacketFlag, sizeof(bPacketFlag));
                }

                //析构函数
                TcpServer::~TcpServer(void)
                {
                        this->Stop();
                }
                void TcpServer::Stop(void)
                {

                }


                //启动服务器
                BOOL TcpServer::Start(UINT nPort, NOTIFYPROC pNotifyProc)
                {
                        m_pNotifyProc = pNotifyProc;

                        if (IntSocket() == FALSE)
                        {
                                return FALSE;
                        }

                        //创建套接字
                        m_socListen = CreateSocket();

                        if (m_socListen == INVALID_SOCKET)
                        {
                                WSACleanup();
                                return FALSE;
                        }
                        // 建立第一个完成端口
                        m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

                        if (m_hIOCompletionPort == NULL)
                        {
                                closesocket(m_socListen);//关闭套接字
                                WSACleanup();//卸载套接字库
                                return FALSE;
                        }
                        //绑定完成端口
                        if (NULL == CreateIoCompletionPort((HANDLE)m_socListen, m_hIOCompletionPort, 0, 0))
                        {
                                closesocket(m_socListen);//关闭套接字
                                WSACleanup();//卸载套接字库
                                return FALSE;
                        }
                        //绑定套接字
                        if (BindSocket(m_socListen, nPort) == FALSE)
                        {
                                CloseHandle(m_hIOCompletionPort);
                                closesocket(m_socListen);//关闭套接字
                                WSACleanup();//卸载套接字库
                                return FALSE;
                        }
                        //监听套接字
                        if (ListenSocket(m_socListen) == FALSE)
                        {
                                CloseHandle(m_hIOCompletionPort);
                                closesocket(m_socListen);//关闭套接字
                                WSACleanup();//卸载套接字库
                                return FALSE;
                        }

               
                        // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针
                        GUID GuidAcceptEx = WSAID_ACCEPTEX;
                        GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;


                       //获取AcceptEx函数指针

                        DWORD dwBytes = 0;
                        if (SOCKET_ERROR == WSAIoctl(
                                m_socListen,
                                SIO_GET_EXTENSION_FUNCTION_POINTER,
                                &GuidAcceptEx,
                                sizeof(GuidAcceptEx),
                                &m_lpfnAcceptEx,
                                sizeof(m_lpfnAcceptEx),
                                &dwBytes,
                                NULL,
                                NULL))
                        {
                                CloseHandle(m_hIOCompletionPort);
                                closesocket(m_socListen);//关闭套接字
                                WSACleanup();//卸载套接字库
                                return FALSE;
                        }

                        // 获取GetAcceptExSockAddrs函数指针,也是同理
                        if (SOCKET_ERROR == WSAIoctl(
                                m_socListen,
                                SIO_GET_EXTENSION_FUNCTION_POINTER,
                                &GuidGetAcceptExSockAddrs,
                                sizeof(GuidGetAcceptExSockAddrs),
                                &m_lpfnGetAcceptExSockAddrs,
                                sizeof(m_lpfnGetAcceptExSockAddrs),
                                &dwBytes,
                                NULL,
                                NULL))
                        {
                                CloseHandle(m_hIOCompletionPort);
                                closesocket(m_socListen);//关闭套接字
                                WSACleanup();//卸载套接字库
                                return FALSE;
                        }

                        // 初始化线程互斥量
                        InitializeCriticalSection(&m_cs);
                        //获取系统信息
                        SYSTEM_INFO si;
                        GetSystemInfo(&si);
                        UINT nWorkerCnt = si.dwNumberOfProcessors * 2;

                        // 为工作者线程初始化句柄
                        m_phWorkerThreads = new HANDLE[nWorkerCnt];
                        m_bIsRunning = TRUE;

                        DWORD i;
                        UINT  nThreadID;
               
                        for (i = 0; i < nWorkerCnt; i++)
                        {
                                m_phWorkerThreads = (HANDLE)_beginthreadex(NULL,                                        // 安全属性,NULL为默认安全属性
                                        0,                                                //指定线程堆栈的大小。如果为0,则线程堆栈大小和创建它的线程的相同。一般用0
                                        WorkerThread,                     //指定线程函数的地址
                                        (void*) this,                        // 传递给线程的参数的指针
                                        0,                                                //线程初始状态,0:立即运行
                                        &nThreadID);                        //用于记录线程ID的地址

                                m_ThreadNumber++;
                                Sleep(20);
                        }
                       

                        printf("--------------------------------\n");


                        PostAccept();//投递接收客户连接请求


                        return m_bIsRunning;
                }
                //投递接收客户连接请求
                BOOL TcpServer::PostAccept()
                {
                        //创建套接字
                        SOCKET        client = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, 0, WSA_FLAG_OVERLAPPED);

                        if (client == INVALID_SOCKET)
                        {
                                return FALSE;
                        }
                        // 准备参数
                        IO_OPERATEION_DATA        *pOverlap = new IO_OPERATEION_DATA; //申请单柄数据内存
                        pOverlap->m_ioType = IO_Accept; //IO操作类型  客户连接
                        pOverlap->pAcceptsocket = client; //客户端套接字
                        DWORD dwBytes = 0;
               
                                //投递接收客户连接请求
                        if (FALSE == m_lpfnAcceptEx(this->m_socListen, //本地套接字
                                pOverlap->pAcceptsocket, //客户端套接字
                                pOverlap->m_wsaBuf.buf,//lpOutputBuffer  缓冲区
                                0,//将接收缓冲置为0,令AcceptEx直接返回,防止拒绝服务攻击
                                sizeof(sockaddr_in)+16,
                                sizeof(sockaddr_in)+16,
                                &dwBytes,
                                &pOverlap->m_ol))
                        {
                                if (WSAGetLastError() != WSA_IO_PENDING) //失败
                                {
                                        delete pOverlap; //删除内存
                                       
                                        return FALSE;
                                }
                        }

                        m_arrayClientContext.push_back(pOverlap);//加入链表统一管理

                        return TRUE;
               
                }
        }
}

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?加入驿站

x

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你在论坛求助问题,并且已经从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友,一个好办法就是给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

kuroro 发表于 2016-5-30 01:12:41 | 显示全部楼层
能不能修复gh0st的iocp

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你在论坛求助问题,并且已经从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友,一个好办法就是给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

回复 支持 反对

使用道具 举报

angeltony 发表于 2016-6-1 14:27:11 | 显示全部楼层
感谢楼主分享

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你在论坛求助问题,并且已经从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友,一个好办法就是给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

回复 支持 反对

使用道具 举报

zskk1 发表于 2016-7-13 22:32:20 来自手机 | 显示全部楼层
我就笑笑不说话1468420271.09

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你在论坛求助问题,并且已经从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友,一个好办法就是给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

回复 支持 反对

使用道具 举报

richlingboy 发表于 2016-11-28 17:31:39 | 显示全部楼层
send的时候
  1. //         等待数据就绪信号可用
  2. WaitForSingleObject(p->m_hWriteComplete, INFINITE);
复制代码

这样不会一直阻塞吗,而且m_hWriteComplete也没有createevent,好纠结,望指教

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你在论坛求助问题,并且已经从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友,一个好办法就是给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

回复 支持 反对

使用道具 举报

李戈 发表于 2017-1-13 23:32:19 | 显示全部楼层
学习,再学习。

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你在论坛求助问题,并且已经从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友,一个好办法就是给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

回复 支持 反对

使用道具 举报

fashion530 发表于 2017-3-13 21:36:14 | 显示全部楼层
欣赏欣赏欣赏欣赏

发帖求助前要善用论坛搜索功能,那里可能会有你要找的答案;

如果你在论坛求助问题,并且已经从坛友或者管理的回复中解决了问题,请编辑帖子并把分类改成【已解决】

如何回报帮助你解决问题的坛友,一个好办法就是给对方加【热心】【驿站币】,加分不会扣除自己的积分,做一个热心并受欢迎的人!

回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 加入驿站

本版积分规则

关闭

站长提醒上一条 /1 下一条

QQ|小黑屋|手机版|VC驿站 ( 辽ICP备09019393号

返回顶部
x

VC驿站微信公众号cctry2009

GMT+8, 2017-7-28 19:15

Powered by Discuz! X3.3

© 2009-2017 cctry.com

快速回复 返回顶部 返回列表