IBM Integration Bus using Aggregation Nodes for fan out/fan in

Godfrey Menezes
7 min readOct 2, 2020

Preface

One of the common requirements that I have come across is having to request data from different systems and present it in a single place. An example of this is when you login on your bank page and you get a single dashboard that will show your -

  1. Checking account summary.
  2. Saving account summary.
  3. Credit Card transaction summary.
  4. Auto loan summary.
  5. Home mortgage summary.

All these details may be residing on a different system and the single denominator may be the customer id that is using these services. Another close use case is if a particular operation needs data for multiple denominations but the external service allows only one request at a time. A case in point is the co-ordinates of the ATM a bank has in the vicinity of a particular zip code while the end system — google maps possibly- only allows one request per service.

One way to work around for this is to use the Aggregation nodes with IBM Integration Bus. In this scenario,the back end application allows only one request at a time. The service we have built receives multiple stock symbols and we use the fan out mechanism to create multiple request for stock quote and then collect them as one response in fan out.

I am using the following web service as the back end application to request the stock quote and the WSDL for that operation is available here.

External/Backend service

In this instance we are using the WSDL to create the external service that will be called from IBM Integration Bus(IIB formerly IBM WebSphere Message Broker). The service receives a request for a stock quote and then replies with the response in SOAP. Create a shared library that will have the web service to get the stock quote. I have named it as ‘MFL_StockQuote’

Create the message model that will have the WSDL specification.

Select the SOAP XML option as our’s is a WSDL.

The WSDL already exist, so specify that one for the message model.

Specify the location of the WSDL, in this case the HTTP URL from where you can get the WSDL.

Select the operation that we intend to use for our application. In this case it is the SOAP operations. Click on ‘Finish’ to complete the operation and import the WSDL.

With the WSDL imported, create the sub flows that will invoke the external service and encapsulate the service invocation.

Named as ‘SF_StockQuoteInvoke’

Drag and drop the WSDL onto the canvas.

The service will be invoking the web service from the sub flow and the operation will be GetQuote.

To keep things simple I have kept this subflow named as SF_StockQuote.

After this is done the library content should look like this. To simplify it I have moved the subflows from their schemas to the top level to not use any schemas.

SF_StockQuote

SF_StockQuote invokes the external service using the WSDL defined details.

The properties of the SOAP Request node.

SF_StockQuoteInvoke

Coming back to SF_StockQuoteInvoke, I have made the following changes to the flow after having had dragged the WSDL onto the canvas-

The SaveMQHdr compute node will save the incoming MQ Header from the message the Aggregate Request flow is sending. The code snippet used is -

CREATE LASTCHILD OF Environment DOMAIN ‘MQMD’ NAME ‘MQMD’ ;
SET Environment.MQMD = InputRoot.MQMD;
SET OutputRoot = InputRoot;
The CreateMQResp compute node is reattaching the will save the incoming MQ Header from the message the Aggregate Request flow is sending. The response that is being sent from the service is being parsed to create a proper XMLNSC message. The code snippet used is -SET OutputRoot.MQMD = Environment.MQMD;

DECLARE sh NAMESPACE ‘http://www.webserviceX.NET/';
DECLARE data CHAR;
SET data = InputRoot.XMLNSC.sh:GetQuoteResponse.sh:GetQuoteResult;

IF (data = ‘exception’) THEN
SET OutputRoot.XMLNSC.StockQuotes.Stock = ‘exception’;
ELSE
CREATE LASTCHILD OF OutputRoot
DOMAIN ‘XMLNSC’ PARSE (data
CCSID InputRoot.Properties.CodedCharSetId
ENCODING 0 );
END IF;

Aggregation(Putting it together)

Till now we have created the service that is going to be invoked. This service will give the stock quote. Next up create the Message Flow application that will have the Aggregation(Fan Out/In). The application is named as ‘MFA_AggregationStockQuote’.

Refer the library that has the stock quote service application subflow.

After the Application has been created it should seem like this -

Create the flows that will be needed.

MF_AggReq

This flow will receive the request from service consumer on WebSphere MQ. The format of the message will be like -

<?xml version="1.0"?>
<StockQuote>
<Symbol>WASIX</Symbol>
<Symbol>VDAIX</Symbol>
</StockQuote>

The flow structure looks like this -

The Aggregate Control is the start of the fan out. The compute node will create the individual request. After all the request are sent to AGG.IN, it will then send the actual message to AGG.CONT.IN. This actual message can be used to relate the actual request to the response.

The request from the consumer may contain the reply to queue and this is also saved for the Aggregate Reply.

DECLARE inRef REFERENCE TO InputRoot.XMLNSC.StockQuote.Symbol[1];DECLARE sh NAMESPACE ‘http://www.webserviceX.NET/';WHILE LASTMOVE(inRef) DOSET OutputRoot.MQMD = InputRoot.MQMD;SET OutputRoot.XMLNSC.sh:GetQuote.sh:symbol = inRef;PROPAGATE TO TERMINAL ‘out’;MOVE inRef NEXTSIBLING REPEAT TYPE NAME;END WHILE;CALL CopyMessageHeaders();SET OutputRoot.BLOB.BLOB = ASBITSTREAM(InputRoot.MQMD, InputRoot.Properties.Encoding, InputRoot.Properties.CodedCharSetId);PROPAGATE TO TERMINAL ‘out1’;

MF_AggRep

MF_AggRep flow receives all the response from the services and then fan ins them using Aggregate Reply node. In this flow we also receives the reply from the control flow that contains the request. This request will be used to restore the MQMD of the request. If that is done and it contains the reply to queue the AGG.REP MQOutput node can be discarded and you may use the MQReply node.

Here is the code snippet used in the compute node -

DECLARE inRef REFERENCE TO InputRoot.ComIbmAggregateReplyBody.StockRep;DECLARE I INT 1;WHILE LASTMOVE(inRef) DOIF EXISTS(inRef.XMLNSC.StockQuotes[]) THENSET OutputRoot.XMLNSC.StockQuotes.Stock[I]=inRef.XMLNSC.StockQuotes.Stock;SET I = I+1;ELSECREATE NEXTSIBLING OF OutputRoot.PropertiesDOMAIN(‘MQMD’)PARSE(inRef.BLOB.BLOBCCSID inRef.Properties.CodedCharSetIdTYPE ‘MQMD’);END IF;MOVE inRef NEXTSIBLING REPEAT TYPE NAME;END WHILE;

MF_SvcInvoke

This flow will call the backend service and then send the response back to the queue specified in the MQReply to queue.

MF_AggCont

This flow just sends the message as it received.

If the flow runs successfully the output in the AGG.REP queue should be something like this —

<?xml version="1.0"?>
<StockQuotes>
<Stock>
<Symbol>WASIX</Symbol>
<Last>11.03</Last>
<Date>11/11/2015</Date>
<Time>6:45pm</Time>
<Change>-0.05</Change>
<Open>N/A</Open>
<High>N/A</High>
<Low>N/A</Low>
<Volume>0</Volume>
<MktCap>N/A</MktCap>
<PreviousClose>11.03</PreviousClose>
<PercentageChange>-0.45%</PercentageChange>
<AnnRange>N/A</AnnRange>
<Earns>N/A</Earns>
<P-E>N/A</P-E>
<Name>Wasatch Strategic Income Fund</Name>
</Stock>
<Stock>
<Symbol>VDAIX</Symbol>
<Last>31.23</Last>
<Date>11/11/2015</Date>
<Time>6:45pm</Time>
<Change>-0.11</Change>
<Open>N/A</Open>
<High>N/A</High>
<Low>N/A</Low>
<Volume>0</Volume>
<MktCap>N/A</MktCap>
<PreviousClose>31.23</PreviousClose>
<PercentageChange>-0.35%</PercentageChange>
<AnnRange>N/A</AnnRange>
<Earns>N/A</Earns>
<P-E>N/A</P-E>
<Name>Vanguard Dividend Appreciation </Name>
</Stock>
</StockQuotes>

The code base used for this can be found in the following location.

--

--