개발관련/C&C++

MSMQ(MS MessageQueue)

Diademata 2018. 5. 15. 05:29
반응형

code : https://github.com/EomTaeWook/Cpp-Util/tree/master/Util/System/Message


C#은 워낙 API가 잘되어 있어서 사용법이 간단함.


Receive 동기적


Receive 쪽은 C#은 10만건 1700byte를 전체 Read 하는데 6초 정도 걸리고 C++은 3초 후반대 정도임.

Send 쪽은 C# 10만건 1700byte 4초, C++ 1.7초...


API와 사용법을 좀 더 찾아보고 다른 MQ 속도도 비교해보고 속도 개선 해봐야 할 듯


C#>> 


namespace MSMQ

{

    //Send {00:00:04.3840442}

    //Receive {00:00:05.5640045}

    //한글 690 자 1700 byte

    class Program

    {

        static void Main(string[] args)

        {

            string _path = @".\Private$\NotifyQueue";

            MessageQueue _mq = null;

            if (MessageQueue.Exists(_path))

            {

                _mq = new MessageQueue(_path);

            }

            else

                _mq = MessageQueue.Create(_path);


            var now = DateTime.Now;

            for (int i = 0; i < 100000; i++)

                _mq.Send(@"{ 계절이 지나가는 하늘에는 ...");

            for (int i = 0; i < 100000; i++)

            {

                var m = _mq.Receive(MessageQueueTransactionType.None);

                var data = m.Body;

            }

  _mq.Dispose();

        }

    }

}


C++>> API Class Wrapping


NS.h


#pragma once


#ifndef MESSAGE_MSMQ_H

#define MESSAGE_MSMQ_H

#define NS_MESSAGE_MSMQ_BEGIN namespace Util { namespace Message { namespace MS { 

#define NS_MESSAGE_MSMQ_END } } }

#define USING_MESSAGE_MSMQ using namespace Util::Message::MS;

#endif


MessageQueue.h


#pragma once


#include <Windows.h>

#include <Mq.h>

#include <string>


#include "NS.h"

#include "MSMQEnum.h"

#include "Message.h"

#pragma comment (lib, "Mqrt.lib")


NS_MESSAGE_MSMQ_BEGIN

class MessageQueue

{

private:

HANDLE _hQueue;

MessageQueueMode _mode;

public:

MessageQueue()

{

_hQueue = INVALID_HANDLE_VALUE;

}

virtual ~MessageQueue()

{

MQCloseQueue(_hQueue);

}

public:

void Init(std::wstring pathName, MessageQueueMode mode);

HRESULT Receive(Message& message, int timeOutMillis);

HRESULT Send(Message& message);

bool IsRead();

};

inline bool MessageQueue::IsRead() { return _mode == MessageQueueMode::READ; }

NS_MESSAGE_MSMQ_END



MessageQueue.cpp


#include "MessageQueue.h"


NS_MESSAGE_MSMQ_BEGIN


void MessageQueue::Init(std::wstring pathName, MessageQueueMode mode)

{

if (pathName.size() == 0)

throw "MQ_ERROR_INVALID_PARAMETER";


_mode = mode;

DWORD formatNameBufferLength = 256;

WCHAR formatNameBuffer[256];

auto hr = MQPathNameToFormatName(pathName.c_str(),

formatNameBuffer,

&formatNameBufferLength);


if (FAILED(hr))

{

// Set queue properties.

const int NUMBEROFPROPERTIES = 1;

MQQUEUEPROPS queueProps;

QUEUEPROPID queuePropId[NUMBEROFPROPERTIES];

MQPROPVARIANT queuePropVar[NUMBEROFPROPERTIES];

HRESULT queueStatus[NUMBEROFPROPERTIES];

DWORD propId = 0;

queuePropId[propId] = PROPID_Q_PATHNAME;

queuePropVar[propId].vt = VT_LPWSTR;

queuePropVar[propId].pwszVal = const_cast<wchar_t*>(pathName.c_str());

propId++;


queueProps.cProp = propId;

queueProps.aPropID = queuePropId;

queueProps.aPropVar = queuePropVar;

queueProps.aStatus = queueStatus;


MQCreateQueue(NULL, &queueProps, formatNameBuffer, &formatNameBufferLength);

}

if (_mode == MessageQueueMode::READ)

MQOpenQueue(formatNameBuffer, (int)MessageQueueAccess::RECEIVE_ACCESS, (int)MessageQueueShare::DENY_RECEIVE_SHARE, &_hQueue);

else

MQOpenQueue(formatNameBuffer, (int)MessageQueueAccess::SEND_ACCESS, (int)MessageQueueShare::DENY_NONE, &_hQueue);

}

HRESULT MessageQueue::Receive(Message& message, int timeOutMillis)

{

if (_hQueue == NULL)

throw "Invalid Handle";


// Define an MQMSGPROPS structure.

const int NUMBEROFPROPERTIES = 5;

DWORD propId = 0;


MQMSGPROPS msgProps;

MSGPROPID msgPropId[NUMBEROFPROPERTIES];

MQPROPVARIANT msgPropVar[NUMBEROFPROPERTIES];

HRESULT msgStatus[NUMBEROFPROPERTIES];


// Specify the message properties to be retrieved.  

msgPropId[propId] = PROPID_M_LABEL_LEN;           // Property ID  

msgPropVar[propId].vt = VT_UI4;                   // Type indicator  

msgPropVar[propId].ulVal = MQ_MAX_MSG_LABEL_LEN;  // Length of label  

propId++;


WCHAR labelBuffer[MQ_MAX_MSG_LABEL_LEN];        // Label buffer  

msgPropId[propId] = PROPID_M_LABEL; // Property ID  

msgPropVar[propId].vt = VT_LPWSTR; // Type indicator  

msgPropVar[propId].pwszVal = labelBuffer;       // Label buffer  

propId++;


msgPropId[propId] = PROPID_M_BODY_SIZE;           // Property ID  

msgPropVar[propId].vt = VT_NULL;                  // Type indicator  

propId++;


msgPropId[propId] = PROPID_M_BODY; // Property ID  

msgPropVar[propId].vt = VT_VECTOR | VT_UI1; // Type indicator

msgPropVar[propId].caub.pElems = message.GetBuffer().get(); // Body buffer

msgPropVar[propId].caub.cElems = message.GetBufferSize(); // Buffer size

propId++;


msgPropId[propId] = PROPID_M_BODY_TYPE; // Property ID

msgPropVar[propId].vt = VT_NULL; // Type indicator

propId++;


// Initialize the MQMSGPROPS structure.  

msgProps.cProp = propId; // Number of message properties  

msgProps.aPropID = msgPropId; // IDs of the message properties  

msgProps.aPropVar = msgPropVar; // Values of the message properties  

msgProps.aStatus = msgStatus; // Error reports  


msgPropVar[0].ulVal = MQ_MAX_MSG_LABEL_LEN;

auto hr = MQReceiveMessage(_hQueue,

timeOutMillis,

MQ_ACTION_RECEIVE,

&msgProps,

NULL,

NULL,

NULL,

MQ_NO_TRANSACTION);

if (hr == S_OK)

message.SetReadSize(msgPropVar[2].ulVal);

return hr;

}

HRESULT MessageQueue::Send(Message& message)

{

if (_hQueue == NULL)

throw "Invalid Handle";


const int NUMBEROFPROPERTIES = 2;

DWORD propId = 0;


// Define an MQMSGPROPS structure.  

MQMSGPROPS msgProps;

MSGPROPID msgPropId[NUMBEROFPROPERTIES];

MQPROPVARIANT msgPropVar[NUMBEROFPROPERTIES];

HRESULT msgStatus[NUMBEROFPROPERTIES];


// Specify the message properties to be retrieved.  

msgPropId[propId] = PROPID_M_BODY; // Property ID 

msgPropVar[propId].vt = VT_VECTOR | VT_UI1; // Type indicator  

msgPropVar[propId].caub.pElems = message.GetBuffer().get(); // Body buffer

msgPropVar[propId].caub.cElems = message.GetBufferSize(); // Buffer size

propId++;


msgPropId[propId] = PROPID_M_LABEL; // Property ID  

msgPropVar[propId].vt = VT_LPWSTR; // Type indicator  

msgPropVar[propId].pwszVal = L"";       // Label buffer  

propId++;


// Initialize the MQMSGPROPS structure.  

msgProps.cProp = propId;

msgProps.aPropID = msgPropId;

msgProps.aPropVar = msgPropVar;

msgProps.aStatus = msgStatus;


return MQSendMessage(_hQueue, &msgProps, MQ_NO_TRANSACTION);

}

NS_MESSAGE_MSMQ_END


Message.h


#pragma once

#include <memory>

#include <string>

#include "NS.h"


NS_MESSAGE_MSMQ_BEGIN

class Message

{

private:

std::unique_ptr<unsigned char[]> _buffer;

int _bufferSize = 4096;

int _readSize = 0;

public:

Message() : Message(4096)

{

}

Message(int bufferSize) : Message("", bufferSize)

{

}

Message(char* buffer, int bufferSize) : _bufferSize(bufferSize)

{

_buffer = std::make_unique<unsigned char[]>(bufferSize);

memcpy(_buffer.get(), buffer, bufferSize);

}

Message(std::string item)

{

_buffer = std::make_unique<unsigned char[]>(item.size());

memcpy(_buffer.get(), item.c_str(), item.size());

_bufferSize = item.size();

}


virtual ~Message() {

_buffer.reset();

}

public:

std::unique_ptr<unsigned char[]>& GetBuffer();

int GetBufferSize();

std::string GetBody();

void SetReadSize(int size);

};

inline std::unique_ptr<unsigned char[]>& Message::GetBuffer()

{

return _buffer;

}

inline void Message::SetReadSize(int size)

{

_readSize = size;

}

inline int Message::GetBufferSize()

{

return _bufferSize;

}

inline std::string Message::GetBody()

{

std::string body(reinterpret_cast<char*>(_buffer.get()));

auto begin = body.find("<string>") + 8;

auto end = body.rfind("</string>") - begin;

return body.substr(begin, end);

}

NS_MESSAGE_MSMQ_END


MSMQEnum.h


#pragma once


#include "NS.h"


NS_MESSAGE_MSMQ_BEGIN

enum class MessageQueueAccess

{

RECEIVE_ACCESS = 1,

SEND_ACCESS = 2,

MOVE_ACCESS = 4,

PEEK_ACCESS = 32,

ADMIN_ACCESS = 128

};

enum class MessageQueueMode

{

READ = 1,

WRITE = 2,

};

enum class MessageQueueShare

{

DENY_NONE = 0,

DENY_RECEIVE_SHARE = 1

};

NS_MESSAGE_MSMQ_END

반응형

'개발관련 > C&C++' 카테고리의 다른 글

IOCP Socket Client 구현  (0) 2018.05.31
HttpClient  (0) 2018.05.25
Functional 이용한 델리게이트  (0) 2018.04.26
IOCP Socket Server 구현  (0) 2018.04.16
StringFormat  (0) 2018.03.20