`
touchinsert
  • 浏览: 1290931 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Microsoft Message Queuing介绍

阅读更多

http://msdn2.microsoft.com/en-us/library/ms711472.aspx

<!--[if supportFields]><span lang=EN-US style='mso-fareast-font-family:SimSun;mso-fareast-language:ZH-CN'><span style='mso-element:field-begin'></span><span style='mso-spacerun:yes'></span>TOC \o &quot;1-3&quot; \h \z \u <span style='mso-element:field-separator'></span></span><![endif]-->

1目的

Message Queuing (MSMQ) 消息队列技术可以实现应用系统间的网络数据传输。应用程序可以发送消息给队列,或从队列中读取消息。

<!--[if gte vml 1]><v:shapetype id="_x0000_t75" coordsize="21600,21600" o:spt="75" o:preferrelative="t" path="m@4@5l@4@11@9@11@9@5xe" filled="f" stroked="f"> <v:stroke joinstyle="miter" /> <v:formulas> <v:f eqn="if lineDrawn pixelLineWidth 0" /> <v:f eqn="sum @0 1 0" /> <v:f eqn="sum 0 0 @1" /> <v:f eqn="prod @2 1 2" /> <v:f eqn="prod @3 21600 pixelWidth" /> <v:f eqn="prod @3 21600 pixelHeight" /> <v:f eqn="sum @0 0 1" /> <v:f eqn="prod @6 1 2" /> <v:f eqn="prod @7 21600 pixelWidth" /> <v:f eqn="sum @8 21600 0" /> <v:f eqn="prod @7 21600 pixelHeight" /> <v:f eqn="sum @10 21600 0" /> </v:formulas> <v:path o:extrusionok="f" gradientshapeok="t" o:connecttype="rect" /> <o:lock v:ext="edit" aspectratio="t" /> </v:shapetype><v:shape id="_x0000_i1025" type="#_x0000_t75" style='width:222pt; height:138pt'> <v:imagedata src="file:///C:\DOCUME~1\趙磊\LOCALS~1\Temp\msohtml1\01\clip_image001.png" o:title="" /> </v:shape><![endif]--><!--[if !vml]--><!--[endif]-->

消息队列保证消息的传递,路由和安全。它可以实现高性能的异步数据传输。

Message Queuing应用程序可以使用C++ API或者COM对象。最新版MSMQ 3.0可安装到Windows XP ProfessionalWindows Server 2003 family

2.队

队列是一种逻辑容器,可被用来保存和转发消息。应用程序可以创建一个队列,指定一个已有队列,打开一个队列,发送消息给队列,也可以从队列中读取消息, 设置,获取队列属性。

目标队列是一种消息队列。用来保存消息。目标队列被设置在服务器中,客户端程序发送消息给服务器计算机的目标队列,服务器程序则可以接收消息。

<!--[if gte vml 1]><v:shape id="_x0000_i1026" type="#_x0000_t75" style='width:291.75pt;height:125.25pt'> <v:imagedata src="file:///C:\DOCUME~1\趙磊\LOCALS~1\Temp\msohtml1\01\clip_image003.png" o:title="" /> </v:shape><![endif]--><!--[if !vml]--><!--[endif]-->

2.1 列路径名

队列路径名提供创建消息队列的信息。队列路径名指定了消息将要保存到队列的计算机名。要创建一个队列,必须要指定队列路径名。

2.2 队列查询

查询一个队列时,可以通过directory service

<!--[if gte vml 1]><v:shape id="_x0000_i1027" type="#_x0000_t75" style='width:285.75pt;height:192.75pt'> <v:imagedata src="file:///C:\DOCUME~1\趙磊\LOCALS~1\Temp\msohtml1\01\clip_image005.png" o:title="" /> </v:shape><![endif]--><!--[if !vml]--><!--[endif]-->

消息队列提供了两个结构体来设置查询条件MQPROPERTYRESTRICTION MQRESTRICTION.

2.3 消息体类型

VT_I11个字符(1 byte)

VT_I2short integer.

VT_I4long integer.

VT_R4A float (4-byte floating point).

VT_R8A double-float (8-byte floating point).

VT_CYA CURRENCY type (8-byte).

VT_DATEAn OLE DATE (8-byte) type.

VT_BSTRUnicode的字符串

VT_ARRAY | VT_UI11bytes.

2.4 Format名称

Format名称提供了需要打开队列的信息。Format名称是在队列被创建时生成的唯一名称。获得Format名称的方法是:

1.当创建队列,会返回一个Format名称。

2.当查询一个队列时,应用程序可以使用API函数获得队列路径名或者队列标识(GUID).,然后可以通过MQPathNameToFormatNameMQInstanceToFormatName把它们转换为format名称。

3.实例

<!--[if gte vml 1]><v:shape id="_x0000_i1028" type="#_x0000_t75" style='width:274.5pt;height:300pt'> <v:imagedata src="file:///C:\DOCUME~1\趙磊\LOCALS~1\Temp\msohtml1\01\clip_image007.png" o:title="" /> </v:shape><![endif]--><!--[if !vml]--><!--[endif]-->

开发消息队列应用程序之前保证:

<!--[if !supportLists]-->l <!--[endif]-->已经部署了消息队列。

<!--[if !supportLists]-->l <!--[endif]-->应用程序已经引用了相应的头文件和mqrt.lib库。

3.1创建一个队列

<!--[if !supportLists]-->1. <!--[endif]-->定一个MQQUEUEPROPS结构体;

<!--[if !supportLists]-->2. <!--[endif]-->设置队列属性PROPID_Q_PATHNAME

<!--[if !supportLists]-->3. <!--[endif]-->初始化MQQUEUEPROPS结构体;

<!--[if !supportLists]-->4. <!--[endif]-->调用MQCreateQueue创建队列。

参考4.1 代码实现

3.2 寻找一个队列

<!--[if !supportLists]-->1. <!--[endif]-->指定寻找条件,使用查询结构体MQPROPERTYRESTRICTION MQRESTRICTION.

<!--[if !supportLists]-->2. <!--[endif]-->使用MQCOLUMNSET指定队列的属性;

<!--[if !supportLists]-->3. <!--[endif]-->调用MQLocateBegin开始查询;

<!--[if !supportLists]-->4. <!--[endif]-->循环中调用MQLocateNext查找下一个;

<!--[if !supportLists]-->5. <!--[endif]-->调用MQLocateEnd释放资源。

参考4.2 代码实现

3.3 打开队列

1.获得队列的Format名;

2.调用MQOpenQueue

3.返回队列句柄。

参考4.3 代码实现。

3.4 读取消息

读取消息有两种,1是同步读取,2时异步读取

同步读取消息时

1.定义队列属性最大值;

2.定义MQMSGPROPS结构体;

3.指定消息属性。如LABELLABEL长度,BODYBODY长度,和BODY类型;

4.实例化MQMSGPROPS结构体;

5.创建队列的format名称;

6.调用MQOpenQueue打开队列;

7.循环调用MQReceiveMessage

8.调用MQCloseQueue释放资源。

参考4.4代码实现

3.5 发送消息

发送消息时:

<!--[if !supportLists]-->1. <!--[endif]-->定义需要的常量和变量;

<!--[if !supportLists]-->2. <!--[endif]-->定义MQMSGPROPS结构体;

<!--[if !supportLists]-->3. <!--[endif]-->指定消息属性,例如LABEL

<!--[if !supportLists]-->4. <!--[endif]-->初始化MQMSGPROPS结构体;

<!--[if !supportLists]-->5. <!--[endif]-->创建format名称打开队列;

<!--[if !supportLists]-->6. <!--[endif]-->调用MQSendMessage发送消息;

<!--[if !supportLists]-->7. <!--[endif]-->调用MQCloseQueue关闭资源。

参考4.5代码实现


4.代码实现

4.1建一个

HRESULT CreateMSMQQueue(

LPWSTR wszPathName,

PSECURITY_DESCRIPTOR pSecurityDescriptor,

LPWSTR wszOutFormatName,

DWORD *pdwOutFormatNameLength

)

{

// 定义队列最大属性.

const int NUMBEROFPROPERTIES = 2;

// 定义属性

MQQUEUEPROPS QueueProps;

MQPROPVARIANT aQueuePropVar[NUMBEROFPROPERTIES];

QUEUEPROPID aQueuePropId[NUMBEROFPROPERTIES];

HRESULT aQueueStatus[NUMBEROFPROPERTIES];

HRESULT hr = MQ_OK;

// Validate the input parameters.

if (wszPathName == NULL || wszOutFormatName == NULL || pdwOutFormatNameLength == NULL)

{

return MQ_ERROR_INVALID_PARAMETER;

}

// 设置属性

DWORD cPropId = 0;

aQueuePropId[cPropId] = PROPID_Q_PATHNAME;

aQueuePropVar[cPropId].vt = VT_LPWSTR;

aQueuePropVar[cPropId].pwszVal = wszPathName;

cPropId++;

WCHAR wszLabel[MQ_MAX_Q_LABEL_LEN] = L"Test Queue";

aQueuePropId[cPropId] = PROPID_Q_LABEL;

aQueuePropVar[cPropId].vt = VT_LPWSTR;

aQueuePropVar[cPropId].pwszVal = wszLabel;

cPropId++;

// 初始化MQQUEUEPROPS 结构体.

QueueProps.cProp = cPropId; // Number of properties

QueueProps.aPropID = aQueuePropId; // IDs of the queue properties

QueueProps.aPropVar = aQueuePropVar; // Values of the queue properties

QueueProps.aStatus = aQueueStatus; // Pointer to the return status

// 调用MQCreateQueue创建队列

WCHAR wszFormatNameBuffer[256];

DWORD dwFormatNameBufferLength = sizeof(wszFormatNameBuffer)/sizeof(wszFormatNameBuffer[0]);

hr = MQCreateQueue(pSecurityDescriptor, // Security descriptor

&QueueProps, // Address of queue property structure

wszFormatNameBuffer, // Pointer to format name buffer

&dwFormatNameBufferLength); // Pointer to receive the queue's format name length

// Return the format name if the queue is created successfully.

if (hr == MQ_OK || hr == MQ_INFORMATION_PROPERTY)

{

if (*pdwOutFormatNameLength >= dwFormatNameBufferLength)

{

wcsncpy(wszOutFormatName, wszFormatNameBuffer, *pdwOutFormatNameLength - 1);

wszOutFormatName[*pdwOutFormatNameLength - 1] = L'\0';

*pdwOutFormatNameLength = dwFormatNameBufferLength;

}

else

{

wprintf(L"The queue was created, but its format name cannot be returned.\n");

}

}

return hr;

}

4.2寻找一个队列

// Define the MQPROPERTYRESTRICTION

// and MQRESTRICTION structures.

const int MAX_PROPERTIES = 13; // 13 possible queue properties

CLSID PRINTER_SERVICE_TYPE = // Dummy GUID

{0x1, 0x2, 0x3, {0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb}};

HRESULT hr = MQ_OK;

MQPROPERTYRESTRICTION PropertyRestriction;

MQRESTRICTION Restriction;

//////////////////////////////////////////

// Set search criteria according to the

// type of service provided by the queue.

/////////////////////////////////////////

// Set the queue restriction to PROPID_Q_TYPE = PRINTER_SERVICE_TYPE.

PropertyRestriction.rel = PREQ;

PropertyRestriction.prop = PROPID_Q_TYPE;

PropertyRestriction.prval.vt = VT_CLSID;

PropertyRestriction.prval.puuid = &PRINTER_SERVICE_TYPE;

// Specify one property restriction.

Restriction.cRes = 1;

Restriction.paPropRes = &PropertyRestriction;

// Set a MQCOLUMNSET structure to specify

// the properties to be returned:

// PROPID_Q_INSTANCE and PROPID_Q_CREATE_TIME.

MQCOLUMNSET Column;

PROPID aPropId[2]; // Only two properties to retrieve

DWORD dwColumnCount = 0;

aPropId[dwColumnCount] = PROPID_Q_INSTANCE;

dwColumnCount++;

aPropId[dwColumnCount] = PROPID_Q_CREATE_TIME;

dwColumnCount++;

Column.cCol = dwColumnCount;

Column.aCol = aPropId;

// Call MQLocateBegin to start a query.

HANDLE hEnum = NULL;

hr = MQLocateBegin(

NULL, //Start search at the top

&Restriction, //Search criteria

&Column, //Properties to return

NULL, //No sort order

&hEnum //Enumeration handle

);

if (FAILED(hr))

{

//

// Error handling.

//

}

// Call MQLocateNext in a loop to examine the

// query results.

MQPROPVARIANT aPropVar[MAX_PROPERTIES];

DWORD cProps, index;

do

{

cProps = MAX_PROPERTIES;

hr = MQLocateNext(

hEnum, // Handle returned by MQLocateBegin

&cProps, // Size of aPropVar array

aPropVar // An array of MQPROPVARIANT for results

);

if (FAILED(hr))

{

break;

}

for (index = 0; index < cProps; index += dwColumnCount)

{

// Process properties of a queue stored in:

// aPropVar[index], aPropVar[index + 1], …,

// aPropVar[index + dwColumnCount - 1].

}

} while (cProps > 0);

// Call MQLocateEnd to end query.

hr = MQLocateEnd(hEnum); // Handle returned by MQLocateBegin.

if (FAILED(hr))

{

//

//Error handling

//

}

4.3打开队列

int OpenMyQueue(

LPWSTR wszPathName,

DWORD dwAccess,

DWORD dwShareMode,

QUEUEHANDLE *phQueue

)

{

HRESULT hr = MQ_OK;

// Validate the input parameters.

if ((wszPathName == NULL) || (phQueue == NULL))

{

return MQ_ERROR_INVALID_PARAMETER;

}

// Call MQPathNameToFormatName to obtain the format name required

// to open the queue.

DWORD dwFormatNameBufferLength = 256; // Length of the format name buffer

WCHAR wszFormatNameBuffer[256]; // Format name buffer

hr = MQPathNameToFormatName(wszPathName,

wszFormatNameBuffer,

&dwFormatNameBufferLength);

if (FAILED(hr))

{

fprintf(stderr, "An error occurred in MQPathNameToFormatName (error: 0x%x).\n", hr);

return hr;

}

// Call MQOpenQueue to open the queue with the access and

// share mode provided by the caller.

hr = MQOpenQueue(

wszFormatNameBuffer, // Format name of the queue

dwAccess, // Access mode

dwShareMode, // Share mode

phQueue // OUT: Queue handle

);

if (FAILED(hr))

{

fprintf(stderr, "An error occurred in MQOpenQueue (error: 0x%x.)\n",hr);

return hr;

}

return hr;

}

4.4 读取消息

HRESULT ReadingDestQueue(

WCHAR * wszQueueName,

WCHAR * wszComputerName

)

{

// Define the required constants and variables.

const int NUMBEROFPROPERTIES = 5;

DWORD cPropId = 0;

HRESULT hr = MQ_OK; // Return code

HANDLE hQueue = NULL; // Queue handle

ULONG ulBufferSize = 2;

// Define an MQMSGPROPS structure.

MQMSGPROPS msgprops;

MSGPROPID aMsgPropId[NUMBEROFPROPERTIES];

MQPROPVARIANT aMsgPropVar[NUMBEROFPROPERTIES];

HRESULT aMsgStatus[NUMBEROFPROPERTIES];

// Specify the message properties to be retrieved.

aMsgPropId[cPropId] = PROPID_M_LABEL_LEN; // Property ID

aMsgPropVar[cPropId].vt = VT_UI4; // Type indicator

aMsgPropVar[cPropId].ulVal = MQ_MAX_MSG_LABEL_LEN; // Length of label

cPropId++;

WCHAR wszLabelBuffer[MQ_MAX_MSG_LABEL_LEN]; // Label buffer

aMsgPropId[cPropId] = PROPID_M_LABEL; // Property ID

aMsgPropVar[cPropId].vt = VT_LPWSTR; // Type indicator

aMsgPropVar[cPropId].pwszVal = wszLabelBuffer; // Label buffer

cPropId++;

UCHAR * pucBodyBuffer = NULL;

pucBodyBuffer = (UCHAR*)malloc(ulBufferSize);

if (pucBodyBuffer == NULL)

{

return MQ_ERROR_INSUFFICIENT_RESOURCES;

}

memset(pucBodyBuffer, 0, ulBufferSize);

aMsgPropId[cPropId] =

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics