#include "stdafx.h"
#include<Winsock2.h>
#include<iostream>
#include<assert.h>//here
#include<MSWSock.h>
#include<vector>
using namespace std;
#define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
#define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
#define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
template<typename t>
void RemoveAt(t vec, int num)
{
t::iterator it = vec.begin() + num;
vec.erase(it);
}
typedef enum _OPERATION_TYPE
{
ACCEPT_POSTED,
SEND_POSTED,
RECV_POSTED,
NULL_POSTED
}OPERATION_TYPE;
typedef struct _PER_IO_CONTEXT
{
OVERLAPPED m_Overlapped;
SOCKET m_sockAccept;
WSABUF m_wsaBuf;
char m_szBuffer[8192];
OPERATION_TYPE m_OpType;
_PER_IO_CONTEXT()
{
ZeroMemory(&m_Overlapped, sizeof(m_Overlapped));
ZeroMemory(m_szBuffer, 8192);
m_sockAccept = INVALID_SOCKET;
m_wsaBuf.buf = m_szBuffer;
m_wsaBuf.len =8192;
m_OpType = NULL_POSTED;
}
~_PER_IO_CONTEXT()
{
if (m_sockAccept != INVALID_SOCKET)
{
closesocket(m_sockAccept);
m_sockAccept = INVALID_SOCKET;
}
}
void ResetBuffer()
{
ZeroMemory(m_szBuffer, 8192);
}
} PER_IO_CONTEXT;
typedef struct _PER_SOCKET_CONTEXT
{
SOCKET m_Socket;
SOCKADDR_IN m_ClientAddr;
vector<_PER_IO_CONTEXT*> m_arrayIoContext;
_PER_SOCKET_CONTEXT()
{
m_Socket = INVALID_SOCKET;
memset(&m_ClientAddr, 0, sizeof(m_ClientAddr));
}
~_PER_SOCKET_CONTEXT()
{
if (m_Socket != INVALID_SOCKET)
{
closesocket(m_Socket);
m_Socket = INVALID_SOCKET;
}
for (int i = 0; i<m_arrayIoContext.size(); i++)
{
delete m_arrayIoContext[i];
}
m_arrayIoContext.clear();
}
_PER_IO_CONTEXT* GetNewIoContext()
{
_PER_IO_CONTEXT* p = new _PER_IO_CONTEXT;
m_arrayIoContext.push_back(p);
return p;
}
void RemoveContext(_PER_IO_CONTEXT* pContext)
{
assert(pContext != NULL);
for (int i = 0; i < m_arrayIoContext.size(); i++)
{
if (pContext == m_arrayIoContext[i])
{
delete pContext;
pContext = NULL;
RemoveAt(m_arrayIoContext, i);
break;
}
}
}
} PER_SOCKET_CONTEXT;
class C
{
public:
HANDLE m_hShutdownEvent;
HANDLE m_hIOCompletionPort;
HANDLE* m_phWorkerThreads;
int m_nThreads;
string m_strIP;
int m_nPort;
CRITICAL_SECTION m_csContextList;
vector<PER_SOCKET_CONTEXT*> m_arrayClientContext;
PER_SOCKET_CONTEXT* m_pListenContext;
LPFN_ACCEPTEX m_lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;
}C;
int _GetNoOfProcessors()
{
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors + 1;
}
bool _IsSocketAlive(SOCKET s)
{
int nByteSent = send(s, "", 0, 0);
if (-1 == nByteSent)
{
return false;
}
return true;
}
bool _PostSend(PER_IO_CONTEXT* pIoContext, char* buf)
{
size_t leng = strlen(buf);
DWORD dwBytes = 0;
WSABUF p_wbuf;
p_wbuf.buf = new char[leng];
p_wbuf.len = leng;
memcpy(p_wbuf.buf, buf, sizeof(char)*leng);
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->m_OpType = SEND_POSTED;
if (SOCKET_ERROR == WSASend(pIoContext->m_sockAccept, &p_wbuf, 1,&dwBytes,NULL, p_ol, NULL) && WSA_IO_PENDING != WSAGetLastError())
{
return false;
}
return true;
}
void _DoSend(PER_SOCKET_CONTEXT *pSocketContext, PER_IO_CONTEXT *pIoContext)
{
EnterCriticalSection(&C.m_csContextList);
pSocketContext->RemoveContext(pIoContext);
LeaveCriticalSection(&C.m_csContextList);
}
bool _DoRecv(PER_IO_CONTEXT* pIoContext)
{
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
cout << (*p_wbuf).buf;
pIoContext->ResetBuffer();
pIoContext->m_OpType = RECV_POSTED;
if (SOCKET_ERROR == WSARecv(pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL) && WSA_IO_PENDING != WSAGetLastError())
{
return false;
}
return true;
}
void _AddToContextList(PER_SOCKET_CONTEXT *pHandleData)
{
EnterCriticalSection(&C.m_csContextList);
C.m_arrayClientContext.push_back(pHandleData);
LeaveCriticalSection(&C.m_csContextList);
}
void _RemoveContext(PER_SOCKET_CONTEXT *pSocketContext)
{
EnterCriticalSection(&C.m_csContextList);
for (int i = 0; i<C.m_arrayClientContext.size(); i++)
{
if (pSocketContext == C.m_arrayClientContext[i])
{
RELEASE(pSocketContext);
RemoveAt(C.m_arrayClientContext,i);
break;
}
}
LeaveCriticalSection(&C.m_csContextList);
}
bool IsSocketInContextList(PER_SOCKET_CONTEXT* pSocketContext)
{
vector<PER_SOCKET_CONTEXT*>::iterator it_find = C.m_arrayClientContext.begin();
for (; it_find != C.m_arrayClientContext.end(); it_find++)
{
if (*it_find == pSocketContext)
{
return true;
}
}
return false;
}
void _ClearContextList()
{
EnterCriticalSection(&C.m_csContextList);
for (int i = 0; i<C.m_arrayClientContext.size(); i++)
{
delete C.m_arrayClientContext[i];
}
C.m_arrayClientContext.clear();
LeaveCriticalSection(&C.m_csContextList);
}
bool HandleError(PER_SOCKET_CONTEXT *pContext, const DWORD& dwErr)
{
if (WAIT_TIMEOUT == dwErr)
{
if (!_IsSocketAlive(pContext->m_Socket))
{
_RemoveContext(pContext);
return true;
}
else
{
return true;
}
}
else if (ERROR_NETNAME_DELETED == dwErr)
{
_RemoveContext(pContext);
return true;
}
else
{
return false;
}
}
bool _AssociateWithIOCP(PER_SOCKET_CONTEXT *pContext)
{
HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, C.m_hIOCompletionPort, (DWORD)pContext, 0);
if (NULL == hTemp)
{
return false;
}
return true;
}
bool _PostAccept(PER_IO_CONTEXT* pAcceptIoContext)
{
assert(INVALID_SOCKET != C.m_pListenContext->m_Socket);
DWORD dwBytes = 0;
pAcceptIoContext->m_OpType = ACCEPT_POSTED;
WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == pAcceptIoContext->m_sockAccept)
{
return false;
}
if (FALSE == C.m_lpfnAcceptEx(C.m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN) + 16) * 2),sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes, p_ol))
{
if (WSA_IO_PENDING != WSAGetLastError())
{
return false;
}
}
return true;
}
bool _DoAccpet(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext)
{
SOCKADDR_IN* ClientAddr = NULL;
SOCKADDR_IN* LocalAddr = NULL;
int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);
PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
C.m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN) + 16) * 2),sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);
memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN));
if (false == _AssociateWithIOCP(pNewSocketContext))
{
RELEASE(pNewSocketContext);
return false;
}
PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
pNewIoContext->m_OpType = RECV_POSTED;
pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket;
/*if (false == _DoRecv(pNewIoContext))
{
pNewSocketContext->RemoveContext(pNewIoContext);
return false;
}*/
_AddToContextList(pNewSocketContext);
pIoContext->ResetBuffer();
return _PostAccept(pIoContext);
}
void _WorkerThread()
{
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
while (WAIT_OBJECT_0 != WaitForSingleObject(C.m_hShutdownEvent, 0))
{
BOOL bReturn = GetQueuedCompletionStatus(C.m_hIOCompletionPort,&dwBytesTransfered,(PULONG_PTR)&pSocketContext,&pOverlapped,INFINITE);
if (NULL == (DWORD)pSocketContext)
{
break;
}
if (!bReturn)
{
DWORD dwErr = GetLastError();
if (!HandleError(pSocketContext, dwErr))
{
break;
}
continue;
}
else
{
PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
if ((0 == dwBytesTransfered) && (RECV_POSTED == pIoContext->m_OpType || SEND_POSTED == pIoContext->m_OpType))
{
_RemoveContext(pSocketContext);
continue;
}
else
{
switch (pIoContext->m_OpType)
{
case ACCEPT_POSTED:
{
_DoRecv(pIoContext);
_DoAccpet(pSocketContext, pIoContext);char k[] = "123456";
cout << C.m_arrayClientContext.size();
}
break;
case RECV_POSTED:
{
_DoRecv(pIoContext);
_PostSend(pIoContext, k);
}
break;
case SEND_POSTED:
{
_DoSend(pSocketContext, pIoContext);
}
break;
default:
break;
}
}
}
}
}
void _InitializeIOCP()
{
C.m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
C.m_nThreads = 2 * _GetNoOfProcessors();
C.m_phWorkerThreads = new HANDLE[C.m_nThreads];
DWORD nThreadID;
for (int i = 0; i < C.m_nThreads; i++)
{
C.m_phWorkerThreads[i] = ::CreateThread(0, 0, (LPTHREAD_START_ROUTINE)_WorkerThread, 0, 0, &nThreadID);
}
}
為什么作業執行緒recv GetQueuedCompletionStatus卻收不到recv_posted,還有為什么wsasend發不出去了,回傳-1,wsagetlasterror回傳10057??求助巨佬!
uj5u.com熱心網友回復:
使用IO完成埠IOCP與執行緒池創建高性能服務器uj5u.com熱心網友回復:

當年也被iocp坑了好久。2010年,現在已經沒啥問題了。多搜集iocp的專案代碼,最好是成熟穩定的,研究個幾天就通了
uj5u.com熱心網友回復:
這個不錯,非常感謝!uj5u.com熱心網友回復:
但是我想知道上面代碼出了什么問題...uj5u.com熱心網友回復:
wsasend用法似乎也沒錯 但就是10057
uj5u.com熱心網友回復:
//
// MessageId: WSAENOTCONN
//
// MessageText:
//
// A request to send or receive data was disallowed because the socket is not connected and (when sending on a datagram socket using a sendto call) no address was supplied.
//
#define WSAENOTCONN 10057L
uj5u.com熱心網友回復:
為啥會連接錯誤啊...
uj5u.com熱心網友回復:
是要重建個重疊結構嗎
uj5u.com熱心網友回復:
用accept實作了,但是acceptex還是不行uj5u.com熱心網友回復:
順便問一下,accept會多大程度的影響效率uj5u.com熱心網友回復:
https://bbs.csdn.net/topics/392748259uj5u.com熱心網友回復:
m_lpfnAcceptEx沒有賦值啊// 加載AcceptEx函式
void LoadAcceptEx() {
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
WSAIoctl(m_hListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, sizeof(guidAcceptEx), &m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx), &dwBytes, NULL, NULL);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/40500.html
標籤:網絡編程
上一篇:求幫助,非常著急啊!!
