개발관련/C&C++

IOCP Socket Client 구현

Diademata 2018. 5. 31. 03:04
반응형

code : https://github.com/EomTaeWook/Cpp-Util/tree/master/Util/Socket


사용예제


#include <Socket\IOCPSocketClient.h>

#pragma comment(lib, "Util_d.lib")

#include<string>

struct Header

{

UINT32 dataSize;

USHORT protocol;

};

struct Packet : Util::Socket::DefaultPacket

{

Header Header;

std::vector<char> Data;

};


class TEST : public Util::Socket::IOCPSocketClient<USHORT, Packet>

{

public:

TEST() {}

virtual ~TEST() {}


public:

void TESTFUNCTION(Packet packet);


// IOCPSocketClient을(를) 통해 상속됨

virtual void OnDisconnected() override

{

}

virtual void OnConnected(Util::Socket::StateObject & stateObject) override

{

}

virtual void OnRecieved(Util::Socket::StateObject & stateObject) override

{

if (stateObject.ReceiveBuffer().Count() >= 6)

{

Header header;

memcpy(&header, &stateObject.ReceiveBuffer().Read(sizeof(Header)).front(), sizeof(Header));

Packet packet;

packet.Header = header;

packet.Data.assign(header.dataSize - sizeof(Header), '\n');

memcpy(&packet.Data.front(), &stateObject.ReceiveBuffer().Read(header.dataSize - sizeof(Header)).front(), header.dataSize - sizeof(Header));

RunCallback(packet.Header.protocol, packet);

}

}

};

void TEST::TESTFUNCTION(Packet packet)

{

//Callback 떨어짐

}


int main()

{

TEST t;

t.BindCallback(1234, std::bind(&TEST::TESTFUNCTION, &t, std::placeholders::_1));

t.Connect("127.0.0.1", 10000);

while (true)

{

Sleep(1000);

}

return 0;

}




IOCPSocketClient.h


#pragma once

#include "NS.h"

#include "IOCPBaseClient.h"

#include <map>

#include "../Common/Trace.h"

NS_SOCKET_BEGIN

template<typename ProtocolType, typename ...Types>

class IOCPSocketClient : public IOCPBaseClient

{

protected:

IOCPSocketClient();

public:

virtual ~IOCPSocketClient();

protected:

std::map<ProtocolType, Util::Common::MulticastDelegate<void, Types...>> _funcMaps;

public:

void BindCallback(ProtocolType protocol, const std::function<void(Types...)>& callback);

void OnCallback(ProtocolType protocol, Types...);

};

template<typename ProtocolType, typename ...Types>

inline IOCPSocketClient<ProtocolType, Types...>::IOCPSocketClient()

{

}

template<typename ProtocolType, typename ...Types>

inline IOCPSocketClient<ProtocolType, Types...>::~IOCPSocketClient()

{

_funcMaps.clear();

}


template<typename ProtocolType, typename ...Types>

inline void IOCPSocketClient<ProtocolType, Types...>::BindCallback(ProtocolType protocol, const std::function<void(Types...)>& callback)

{

if (_funcMaps.find(protocol) == _funcMaps.end())

_funcMaps.insert(std::pair<ProtocolType, Util::Common::MulticastDelegate<void, Types...>>(protocol, std::move(callback)));

else

throw std::exception("An item with the same key has already been Added");

}


template<typename ProtocolType, typename ...Types>

inline void IOCPSocketClient<ProtocolType, Types...>::OnCallback(ProtocolType protocol, Types... params)

{

try

{

auto it = _funcMaps.find(protocol);

if (it != _funcMaps.end())

it->second(std::forward<Types>(params)...);

else

throw std::exception("KeyNotFoundException");

}

catch (const std::exception& ex)

{

Util::Common::Trace::WriteLine(ex.what());

}

catch (...)

{

Util::Common::Trace::WriteLine("OnCallbackException");

}

}


NS_SOCKET_END


IOCPBaseClient.h


#pragma once

#include "NS.h"

#include "StateObject.h"

#include "../Threading/Thread.h"


NS_SOCKET_BEGIN

class IOCPBaseClient

{

private:

static const LONG_PTR _CLOSE_THREAD = -1;

protected:

IOCPBaseClient();

public:

virtual ~IOCPBaseClient();

private:

HANDLE _completionPort;

std::vector<HANDLE> _hWorkerThread;

StateObject _stateObject;

sockaddr_in _iPEndPoint;

UINT _threadSize;

private:

std::unique_ptr<Util::Threading::Thread> _keepAliveThread;

public:

void Init(UINT threadSize = 0);

void Connect(std::string ip, int port, int timeOut = 5000);

bool IsConnect();

void DisConnect();

private:

void KeepAlive(void* state);

void BeginReceive();

int Invoke();

void Stop();

public:

void Send(Util::Socket::IPacket& packet);

protected:

//abstract Method

virtual void OnKeepAlive(Util::Socket::StateObject& stateObject);

virtual void OnDisconnected() = 0;

virtual void OnConnected(Util::Socket::StateObject& stateObject) = 0;

virtual void OnRecieved(Util::Socket::StateObject& stateObject) = 0;

public:

static unsigned int __stdcall Run(void*);

};

inline IOCPBaseClient::IOCPBaseClient() : _threadSize(0), _completionPort(NULL)

{

}

inline IOCPBaseClient::~IOCPBaseClient()

{

Stop();

}

NS_SOCKET_END


IOCPBaseClient.cpp


#include "IOCPBaseClient.h"

#include <WS2tcpip.h>

#include "../Common/Trace.h"

#pragma comment(lib, "Ws2_32.lib")

NS_SOCKET_BEGIN

void IOCPBaseClient::Init(UINT threadSize)

{

_completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

if (_completionPort == INVALID_HANDLE_VALUE)

throw std::exception(std::string("CreateIoCompletionPort Fail : " + std::to_string(GetLastError())).c_str());

SYSTEM_INFO info;

GetSystemInfo(&info);

if (info.dwNumberOfProcessors * 2 < threadSize && threadSize > 0)

threadSize = info.dwNumberOfProcessors;

else

threadSize = info.dwNumberOfProcessors * 2;

for (size_t i = 0; i < threadSize; i++)

_hWorkerThread.push_back((HANDLE)_beginthreadex(0, 0, Run, this, 0, NULL));

_threadSize = threadSize;

}

bool IOCPBaseClient::IsConnect()

{

return _stateObject.Socket() != NULL;

}

void IOCPBaseClient::DisConnect()

{

_stateObject.Close();

}

void IOCPBaseClient::Connect(std::string ip, int port, int timeOut)

{

if (_completionPort == NULL)

Init(_threadSize);

WSADATA wsaData;

if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)

throw std::exception(std::string("WSAStartupError : " + std::to_string(GetLastError())).c_str());


auto socket = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

if (socket == INVALID_SOCKET)

throw std::exception(std::string("SocketCreateError : " + std::to_string(GetLastError())).c_str());


memset(&_iPEndPoint, 0, sizeof(sockaddr_in));

if (inet_pton(AF_INET, ip.c_str(), &_iPEndPoint.sin_addr) != 1)

throw std::exception(std::string("inet_ptonError : " + std::to_string(GetLastError())).c_str());


_iPEndPoint.sin_family = AF_INET;

_iPEndPoint.sin_port = htons(port);


ULONG mode = 1;

::ioctlsocket(socket, FIONBIO, &mode);

if (WSAConnect(socket, (SOCKADDR*)&_iPEndPoint, sizeof(sockaddr_in), NULL, NULL, NULL, NULL) == SOCKET_ERROR)

{

auto error = WSAGetLastError();

if (error == WSAEWOULDBLOCK)

{

TIMEVAL tv;

tv.tv_sec = timeOut / 1000;

tv.tv_usec = 0;

fd_set fdSet, fdError;

FD_ZERO(&fdSet);

FD_ZERO(&fdError);

FD_SET(socket, &fdSet);

FD_SET(socket, &fdError);

::select(0, NULL, &fdSet, &fdError, &tv);

if (FD_ISSET(socket, &fdSet))

{

_stateObject.Socket() = socket;

CreateIoCompletionPort((HANDLE)_stateObject.Socket(), _completionPort, (ULONG_PTR)&_stateObject, 0);

_keepAliveThread = std::make_unique<Util::Threading::Thread>(std::bind(&IOCPBaseClient::KeepAlive, this, std::placeholders::_1), &_stateObject);

_keepAliveThread->Start();

OnConnected(_stateObject);

BeginReceive();

}

else

{

closesocket(socket);

WSACleanup();

throw std::exception(std::string("ConnectFail : " + std::to_string(error)).c_str());

}

}

else

{

closesocket(socket);

WSACleanup();

throw std::exception(std::string("ConnectFail : " + std::to_string(error)).c_str());

}

}

}

void IOCPBaseClient::KeepAlive(void* state)

{

auto stateObject = reinterpret_cast<StateObject*>(state);

if(stateObject == NULL)

_stateObject.Close();

fd_set fdSet;

FD_ZERO(&fdSet);

FD_SET(_stateObject.Socket(), &fdSet);

TIMEVAL tv;

tv.tv_sec = 5;

tv.tv_usec = 1;

while (IsConnect())

{

auto check = fdSet;

auto error = ::select(1, &check, NULL, NULL, &tv);

switch (error)

{

case -1:

Util::Common::Trace::WriteLine(std::string("Error : " + std::to_string(GetLastError())), "KeepAlive");

case 0:

OnKeepAlive(*stateObject);

break;

case 1:

_stateObject.Close();

break;

}

}

}

void IOCPBaseClient::BeginReceive()

{

DWORD flags = 0;

if (WSARecv(_stateObject.Socket(), &_stateObject.WSABuff(), 1, 0, &flags, (LPWSAOVERLAPPED)&_stateObject.ReceiveOverlapped(), NULL) == SOCKET_ERROR)

{

if (WSAGetLastError() != WSA_IO_PENDING)

_stateObject.Close();

}

}

void IOCPBaseClient::Stop()

{

_keepAliveThread.reset();


for (size_t i = 0; i < _hWorkerThread.size(); i++)

PostQueuedCompletionStatus(_completionPort, 0, _CLOSE_THREAD, NULL);

for (size_t i = 0; i < _hWorkerThread.size(); i++)

{

WaitForSingleObject(_hWorkerThread[i], INFINITE);

CloseHandle(_hWorkerThread[i]);

}

_stateObject.Close();

WSACleanup();

CloseHandle(_completionPort);

_completionPort = NULL;

_hWorkerThread.clear();

}

void IOCPBaseClient::Send(Util::Socket::IPacket& packet)

{

_stateObject.Send(packet);

}

int IOCPBaseClient::Invoke()

{

ULONG bytesTrans = 0;

ULONG_PTR stateObject = 0;

Socket::Overlapped* overlapped = nullptr;

while (true)

{

if (!GetQueuedCompletionStatus(_completionPort, &bytesTrans, &stateObject, (LPOVERLAPPED*)&overlapped, INFINITE))

{

auto pHandle = reinterpret_cast<StateObject*>(stateObject);

int error = ::WSAGetLastError();

Common::Trace::WriteLine(std::to_string(error), "Error");

switch (error)

{

case ERROR_NETNAME_DELETED:

case ERROR_SEM_TIMEOUT:

pHandle->Close();

_keepAliveThread.reset();

OnDisconnected();

break;

}

continue;

}

if ((LONG_PTR)stateObject == _CLOSE_THREAD && bytesTrans == 0)

break;

auto pHandle = reinterpret_cast<StateObject*>(stateObject);

if (bytesTrans == 0)

{

pHandle->Close();

_keepAliveThread.reset();

OnDisconnected();

continue;

}

if (overlapped->mode == Socket::Mode::Receive)

{

try

{

pHandle->ReceiveBuffer().Append(_stateObject.WSABuff().buf, bytesTrans);

OnRecieved(*pHandle);

}

catch (const std::exception& ex)

{

Common::Trace::WriteLine(ex.what(), "Invoke");

}

BeginReceive();

}

}

return 0;

}

unsigned int __stdcall IOCPBaseClient::Run(void* obj)

{

auto client = reinterpret_cast<IOCPBaseClient*>(obj);

if (client != NULL)

return client->Invoke();

return 0;

}

void IOCPBaseClient::OnKeepAlive(Util::Socket::StateObject& stateObject)

{

stateObject.Send("", 1);

}

NS_SOCKET_END

반응형

'개발관련 > C&C++' 카테고리의 다른 글

FileLogger  (0) 2018.10.02
반복자(iterator)가 포함된 Queue  (0) 2018.09.08
HttpClient  (0) 2018.05.25
MSMQ(MS MessageQueue)  (0) 2018.05.15
Functional 이용한 델리게이트  (0) 2018.04.26