Списки распространения ( модель "один ко многим" )
Использование механизма списков распространения (Distribution List) или так называемой модели "один ко многим" требуется, например, в случае рассылки большому количеству клиентов постоянно меняющейся информации (котировки акций, курсы валют, новости и т.п.). Этот механизм позволяет одной командой MQOPEN открыть множество очередей и одной командой MQPUT положить сообщения в эти очереди. После открытия очередей возвращается один уникальный идентификатор объекта и MQPUT помещает сообщения во все эти очереди, используя этот единственный идентификатор.
В версии WebSphere MQ 5.1 и выше object descriptor (MQOD) содержит поля, которые используются для списков распространения. Поле Object Descriptor RecsPresent содержит число Object Records (MQORs) и если оно больше чем 0, то это означает, что должен быть использован список распространения.
Рассмотрим этот механизм на примере задачи, когда WebSphere MQ server помещает сообщения в N очередей, как показано на рис.9.3. Эти сообщения могут дальше уходить через remote queue или их может забирать WebSphere MQ client с заданной периодичностью. Назовем нашу программу distlist.exe, файл с текстом сообщения distlist.dat и файл инициализации distlist.ini, в котором 1-я строка – имя менеджера, 2-я и последующие строки – имена очередей, как показано ниже.
QM_ ALFA Queue_ Moscow Queue_ Kiev Queue_ Alma-Ata Queue_ SPetersburg Queue_ Novosibirsk Queue_ Saratov
//last string must be blank
Рис. 9.3. Механизм Distribution List для WebSphere MQ
Ниже приводится листинг программы distlist.cpp для Microsoft Visual C++ ver.6.0.
Листинг 9.3. Программа distlist.cpp для Microsoft Visual C++ ver.6.0. (html, txt)
В завершение раздела можно сказать, что время работы механизма Distribution List для WebSphere MQ с 200, 400, 600 и т.д. очередями не зависит от производительности компьютера и не сильно зависит от количества очередей (для Notpersistent queue, persistent queue не целесообразно использовать для данной задачи ). Это наглядно видно из следующей таблицы, отражающей время работы distlist (сек) в зависимости от оперативной памяти (ОП) компьютера: 512Мбт и 1Гбт.
static void print_usage(void); static void print_responses( char * comment, PMQRR pRR, MQLONG NumQueues, PMQOR pOR);
int main(int argc, char **argv) { typedef enum {False, True} Bool; MQOD od = {MQOD_DEFAULT}; /* Object Descriptor */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */ MQHCONN Hcon; /* connection handle */ MQHOBJ Hobj; /* object handle */ MQLONG O_options; /* MQOPEN options */ MQLONG C_options; /* MQCLOSE options */ MQLONG CompCode; /* completion code */ MQLONG OpenCode; /* MQOPEN completion code */ MQLONG Reason; /* reason code */ MQCHAR48 QManager; /* queue manager name */ MQLONG buflen; /* buffer length */ char buffer[101]; /* message buffer */ MQLONG Index ; /* Index into list of queues */ MQLONG NumQueues ; /* Number of queues */ PMQRR pRR=NULL; /* Pointer to response records */ PMQOR pOR=NULL; /* Pointer to object records */ Bool DisconnectRequired=False;/* Already connected switch */ Bool Connected=False; /* Connect succeeded switch */
typedef struct { MQBYTE24 MsgId; MQBYTE24 CorrelId; } PutMsgRec, *pPutMsgRec; pPutMsgRec pPMR=NULL; /* Pointer to put msg records */ MQLONG PutMsgRecFields=MQPMRF_MSG_ID | MQPMRF_CORREL_ID;
/* Open ini file and setting value */ if ( (fptr=fopen ("distlist.ini","r" )) == NULL ) {printf("Cannot open distlist.ini file" ); print_usage(); exit(1); } else{ fgets(QManager, 48, fptr); queuenamelen = strlen(QManager) - 1; QManager[queuenamelen] = ' '; NumQueues = 0; while (queuenamelen != 0) { fgets(queue[NumQueues], 48, fptr); queuenamelen = strlen(queue[NumQueues]) - 1; queue[NumQueues][queuenamelen] = ' '; NumQueues++; } } fclose (fptr); --NumQueues; /* NumQueues - Number of Queue name */ /* Allocate response records, object records and put message records */ pRR = (PMQRR)malloc( NumQueues * sizeof(MQRR)); pOR = (PMQOR)malloc( NumQueues * sizeof(MQOR)); pPMR = (pPutMsgRec)malloc( NumQueues * sizeof(PutMsgRec));
if((NULL == pRR) || (NULL == pOR) || (NULL == pPMR)) { printf("%s(%d) malloc failed\n", __FILE__, __LINE__); exit(4); }
/* Use parameters as the name of the target queues */
for( Index = 0 ; Index < NumQueues ; Index ++) { strncpy( (pOR+Index)->ObjectName, queue[Index], (size_t)MQ_Q_NAME_LENGTH); strncpy( (pOR+Index)->ObjectQMgrName, QManager, (size_t)MQ_Q_MGR_NAME_LENGTH); }
for( Index = 0 ; Index < NumQueues ; Index ++) { MQCONN((pOR+Index)->ObjectQMgrName, &Hcon, &((pRR+Index)->CompCode), &((pRR+Index)->Reason)); if ((pRR+Index)->CompCode == MQCC_FAILED) { continue; } if ((pRR+Index)->CompCode == MQCC_OK) { DisconnectRequired = True ; } Connected = True; break ; } /* Print any non zero responses */ print_responses("MQCONN", pRR, Index, pOR);
/* Print If failed to connect to queue manager then exit. */ if( False == Connected ) { printf("Unable to connect to queue manager\n"); exit(3) ; }
if ( (fp=fopen ("distlist.dat","r" )) == NULL ) {printf("Cannot open distlist.dat file" ); exit(2); } else{ fgets(buffer, 100, fptr); buflen = (MQLONG)strlen(buffer); /* length without null */ if (buffer[buflen-1] == '\n') /* last char is a new-line */ { buffer[buflen-1] = '\0'; /* replace new-line with null */ --buflen; /* reduce buffer length */ } } fclose (fp);
tmr = time(NULL); strcpy ( buf, ctime(&tmr)); buf[strlen(buf)-5]=0; printf("Distlist start send message to list queue %s\n", buf);
/* Open the target message queue for output */ od.Version = MQOD_VERSION_2 ; od.RecsPresent = NumQueues ; od.ObjectRecPtr = pOR; od.ResponseRecPtr = pRR ; O_options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason); if (Reason == MQRC_MULTIPLE_REASONS) { print_responses("MQOPEN", pRR, NumQueues, pOR); } else { if (Reason != MQRC_NONE) { printf("MQOPEN returned CompCode=%d, Reason=%d\n", OpenCode, Reason); } }
/* Read message from the file /* Loop until null line or end of file, or there is a failure */ CompCode = OpenCode; /* use MQOPEN result for initial test */