View Javadoc

1   /*
2      Licensed under the Apache License, Version 2.0 (the "License");
3      you may not use this file except in compliance with the License.
4      You may obtain a copy of the License at
5   
6        http://www.apache.org/licenses/LICENSE-2.0
7   
8      Unless required by applicable law or agreed to in writing, software
9      distributed under the License is distributed on an "AS IS" BASIS,
10     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11     See the License for the specific language governing permissions and
12     limitations under the License.
13  */
14  package uk.nhs.interoperability.consumer.appemulator;
15  
16  import java.io.IOException;
17  import java.io.InputStream;
18  import java.util.HashMap;
19  import java.util.Map;
20  import java.util.Properties;
21  import java.util.Queue;
22  import java.util.Scanner;
23  import java.util.UUID;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.LinkedBlockingQueue;
27  
28  import uk.nhs.interoperability.consumer.AbstractSimpleMessageServlet;
29  import uk.nhs.interoperability.consumer.ITKMessageConsumer;
30  import uk.nhs.interoperability.infrastructure.ITKAddressImpl;
31  import uk.nhs.interoperability.infrastructure.ITKMessageProperties;
32  import uk.nhs.interoperability.infrastructure.ITKMessagingException;
33  import uk.nhs.interoperability.payload.ITKMessage;
34  import uk.nhs.interoperability.payload.ITKSimpleMessageResponse;
35  import uk.nhs.interoperability.payload.SimpleMessage;
36  import uk.nhs.interoperability.source.ITKMessageSender;
37  import uk.nhs.interoperability.source.ITKMessageSenderImpl;
38  import uk.nhs.interoperability.transform.TransformManager;
39  import uk.nhs.interoperability.util.HL7Utils;
40  import uk.nhs.interoperability.util.Logger;
41  
42  /**
43   * The Class ConsumerApplicationEmulator.
44   *
45   * @author Michael Odling-Smee
46   * @author Nicholas Jones
47   * @since 0.1
48   */
49  public class ConsumerApplicationEmulator extends AbstractSimpleMessageServlet 
50  			implements ITKMessageConsumer, Runnable {
51  
52  	/** The Constant serialVersionUID. */
53  	private static final long serialVersionUID = -6555559685313912541L;
54  
55  	/** The Constant FROMADDRESS. */
56  	private static final String FROMADDRESS = "urn:nhs-uk:addressing:ods:TESTORGS:ORGA";
57  
58  	/** The executor service. */
59  	private ExecutorService executorService;
60  	
61  	/** The is running. */
62  	private boolean isRunning = true;
63  	
64  	/** The itk message sender. */
65  	private ITKMessageSender itkMessageSender;
66  	
67  	/** The props. */
68  	private Properties props = new Properties();
69  	
70  	/** The audit identity. */
71  	private String auditIdentity;
72  	
73  	/*
74  	 * Construct an in-memory queue to queue ITK messages request for
75  	 * subsequent asynchronous processing - this emulates a common
76  	 * pattern in many application where inbound messages are queued
77  	 * for processing 
78  	 */
79  	/** The async processing queue. */
80  	private Queue<ITKMessage> asyncProcessingQueue = new LinkedBlockingQueue<ITKMessage>();
81  	
82  	/* (non-Javadoc)
83  	 * @see uk.nhs.interoperability.consumer.AbstractSimpleMessageServlet#getMessageConsumer()
84  	 */
85  	@Override
86  	public ITKMessageConsumer getMessageConsumer() {
87  		// TODO Auto-generated method stub
88  		return this;
89  	}
90  
91  	/**
92  	 * Instantiates a new consumer application emulator.
93  	 */
94  	public ConsumerApplicationEmulator() {
95  		this.executorService = Executors.newFixedThreadPool(1);
96  		this.executorService.execute(this);
97  		this.itkMessageSender = new ITKMessageSenderImpl();
98  		this.props = new Properties();
99  		try {
100 			props.load(this.getClass().getResourceAsStream("/consumeremulator.properties"));
101 		} catch (IOException e) {
102 			Logger.error("Could not load consumer emulator properties - emulator not likely to behave correctly", e);
103 		}
104 		this.auditIdentity = this.props.getProperty("audit.identity");
105 	}
106 
107 	/* (non-Javadoc)
108 	 * @see uk.nhs.interoperability.consumer.ITKMessageConsumer#onSyncMessage(uk.nhs.interoperability.payload.ITKMessage)
109 	 */
110 	@Override
111 	public ITKMessage onSyncMessage(ITKMessage request) throws ITKMessagingException {
112 		Logger.debug("Application invoked");
113 		ITKMessage response = this.processMessage(request);
114 		response.getMessageProperties().setFromAddress(new ITKAddressImpl(FROMADDRESS));
115 
116 		return response;
117 	}
118 	
119 	/* (non-Javadoc)
120 	 * @see uk.nhs.interoperability.consumer.ITKMessageConsumer#onMessage(uk.nhs.interoperability.payload.ITKMessage)
121 	 */
122 	@Override
123 	public void onMessage(ITKMessage request) throws ITKMessagingException {
124 		//Do some (really) basic validation of the message
125 		if (request == null) {
126 			throw new ITKMessagingException("The request was null - could not process");
127 		}
128 		if (request.getBusinessPayload() == null && request.getMessageProperties() == null) {
129 			throw new ITKMessagingException(request.getMessageProperties(), ITKMessagingException.INVALID_MESSAGE_CODE, "The request message properties or contents were null - message cannot be processed");
130 		}
131 		//Queue for asynchronous processing
132 		this.asyncProcessingQueue.add(request);
133 	}
134 	
135 	/**
136 	 * Process async message.
137 	 *
138 	 * @param request the request
139 	 */
140 	private void processAsyncMessage(ITKMessage request) {
141 		Logger.debug("Processing queued itk request");
142 		try {
143 			ITKMessage response = this.processMessage(request);
144 			if (response != null) {
145 				// TODO: Consider this decision to turn this into a send request rather than sendAsync
146 				response.getMessageProperties().setFromAddress(new ITKAddressImpl(FROMADDRESS));
147 				this.itkMessageSender.send(response);
148 			} else {
149 				Logger.info("No response configured/created for " + request);
150 			}
151 		} catch (ITKMessagingException e) {
152 			/*
153 			 * In a real application some more in-depth error
154 			 * handling may occur such as storing the failed message
155 			 * for attention of an administrator, however for this
156 			 * simple application emulator we just log an error
157 			 */
158 			Logger.error("Could not send aysnchronous response", e);
159 		}
160 	}
161 	
162 	/**
163 	 * Process message.
164 	 *
165 	 * @param request the request
166 	 * @return the iTK message
167 	 * @throws ITKMessagingException the iTK messaging exception
168 	 */
169 	private ITKMessage processMessage(ITKMessage request) throws ITKMessagingException {
170 		
171 		String requestMsgService = request.getMessageProperties().getServiceId();	
172 		/*
173 		 * This simple emulator application's response behaviour is determined
174 		 * via configuration - more sophisticated measures are likely to exist in
175 		 * a real world implementation
176 		 */
177 		String responseType = this.props.getProperty(requestMsgService + ".response.type");
178 		
179 		//Conditional logic depending on type of response
180 		if (responseType == null) {
181 			//Ooops no response configured
182 			throw new ITKMessagingException("Incorrect emulator configuration - no response type configured for " + requestMsgService);
183 			
184 		} else if (responseType.equalsIgnoreCase("simple")) {
185 			//Simple message response - can be generated directly
186 			Logger.trace("Creating a simple message response");
187 			return new ITKSimpleMessageResponse(request.getMessageProperties(), true);
188 			
189 		} else if (responseType.equalsIgnoreCase("fullResponse")) {
190 			//Full business response - construct via XSLT
191 			String responseProfileId = this.props.getProperty(request.getMessageProperties().getServiceId() + "Response.profileId");
192 			Logger.trace("Creating a business response");
193 			return this.turnaroundViaXSLT(responseProfileId, request);
194 			
195 		} else if (responseType.equalsIgnoreCase("fixedResponse")) {
196 			// Fixed response - construct via fixed response for message type
197 			String responseProfileId = this.props.getProperty(request.getMessageProperties().getServiceId() + "Response.profileId");
198 			Logger.trace("Creating a Fixed response");
199 			return this.turnaroundViaFile(responseProfileId, request);
200 
201 		} else if (responseType.equalsIgnoreCase("businessAck")) {
202 			//Create a business ack if required
203 			return this.createBusinessAck(request);
204 		}  else if (responseType.equalsIgnoreCase("none")) {
205 			//No business response is expected
206 			return null;
207 		} else {
208 			//Whilst something was configured the value was unexpected
209 			throw new ITKMessagingException("Incorrect emulator configuration - unknown response type (" + responseType + ") configured for " + requestMsgService);
210 		}
211 	}
212 	
213 	/**
214 	 * Creates the business ack.
215 	 *
216 	 * @param request the request
217 	 * @return the iTK message
218 	 * @throws ITKMessagingException the iTK messaging exception
219 	 */
220 	private ITKMessage createBusinessAck(ITKMessage request) throws ITKMessagingException {
221 		//Create a businessAck if requested
222 		String businessAckHandlingSpec = request.getMessageProperties().getHandlingSpecification(ITKMessageProperties.BUSINESS_ACK_HANDLING_SPECIFICATION_KEY);
223 		if (businessAckHandlingSpec != null && businessAckHandlingSpec.equals("true")) {
224 			String businessAckService = "urn:nhs-itk:services:201005:SendBusinessAck-v1-0";
225 			String responseProfileId = this.props.getProperty(businessAckService + ".profileId");
226 			Logger.trace("Creating a business response");
227 			ITKMessage msg = this.turnaroundViaXSLT(responseProfileId, request);
228 			msg.getMessageProperties().setServiceId(businessAckService);
229 			return msg;
230 		} else {
231 			Logger.trace("No handling specification for business ack - not creating a business Ack");
232 		}
233 		//Otherwise no response is required
234 		return null;
235 	}
236 	
237 	/**
238 	 * Turnaround via xslt.
239 	 *
240 	 * @param responseProfileId the response profile id
241 	 * @param request the request
242 	 * @return the iTK message
243 	 * @throws ITKMessagingException the iTK messaging exception
244 	 */
245 	private ITKMessage turnaroundViaXSLT(String responseProfileId, ITKMessage request) throws ITKMessagingException {
246 		//Use simple XSLTs to create appropriate responses
247 		String xslt = this.props.getProperty(request.getMessageProperties().getServiceId() + ".turnaround.xslt");
248 		if (xslt != null) {
249 			
250 			//Create a response using the message properties from the request
251 			SimpleMessage msg = new SimpleMessage(request.getMessageProperties(), this.auditIdentity, responseProfileId, true);
252 			UUID messageId = UUID.randomUUID();
253 			msg.getMessageProperties().setBusinessPayloadId(messageId.toString().toUpperCase());
254 			
255 			Logger.trace("Using " + xslt + " to turnaround request");
256 			String inputXML = request.getBusinessPayload();
257 			//Logger.trace("XSLT input " + inputXML);
258 			String outputXML = TransformManager.doTransform(xslt, inputXML, this.getTransformParameters(msg));
259 			//Logger.trace("XSLT output " + outputXML);			
260 			msg.setBusinessPayload(outputXML);
261 			
262 			return msg;			
263 		} else {
264 			Logger.warn("Could not use XSLT to turnaround request");
265 		}
266 		return null;
267 	}
268 	
269 	/**
270 	 * Turnaround via file.
271 	 *
272 	 * @param responseProfileId the response profile id
273 	 * @param request the request
274 	 * @return the iTK message
275 	 * @throws ITKMessagingException the iTK messaging exception
276 	 */
277 	private ITKMessage turnaroundViaFile(String responseProfileId, ITKMessage request) throws ITKMessagingException {
278 		
279 		//Use fixed file response
280 		String fileName = this.props.getProperty(request.getMessageProperties().getServiceId() + ".turnaround.txt");
281 		if (fileName != null) {
282 			
283 			
284 			//Create a response using the message properties from the request
285 			SimpleMessage msg = new SimpleMessage(request.getMessageProperties(), this.auditIdentity, responseProfileId, true);
286 			UUID messageId = UUID.randomUUID();
287 			msg.getMessageProperties().setBusinessPayloadId(messageId.toString().toUpperCase());
288 			
289 			Logger.trace("Using " + fileName + " to turnaround request");
290 			String responseString;
291 			try {
292 				responseString = readFile(fileName);
293 			} catch (IOException e) {
294 				// TODO Auto-generated catch block
295 				e.printStackTrace();
296 				throw new ITKMessagingException("Incorrect emulator configuration - error reading response file (" + fileName);
297 			}
298 			msg.setBusinessPayload(responseString);
299 			// TODO: How does this work for PH i.e. Base64 - maybe different problems for Sync / Async response.....
300 			return msg;			
301 		} else {
302 			Logger.warn("Could not use XSLT to turnaround request");
303 		}
304 		return null;
305 	}
306 	
307 	/**
308 	 * Extract some common properties from the newly created
309 	 * message to pass to the XSLT transform as a map of parameters.
310 	 *
311 	 * @param itkMessage The skeleton ItkMessage whose contents will be created
312 	 * as a result of the transform
313 	 * @return A Map containing the appropriate XSLT transform parameters including items
314 	 * such as the new message Id, sender, receiver etc.
315 	 */
316 	private Map<String, String> getTransformParameters(ITKMessage itkMessage) {
317 		if (itkMessage != null && itkMessage.getMessageProperties() != null) {
318 			 ITKMessageProperties msgProps = itkMessage.getMessageProperties();
319 			 Map<String, String> params = new HashMap<String, String>();
320 			 params.put("response-msg-id", msgProps.getBusinessPayloadId());
321 			 params.put("from-address", msgProps.getFromAddress().getURI());
322 			 params.put("to-address", msgProps.getToAddress().getURI());
323 			 params.put("creation-time", HL7Utils.getHL7DateTime());
324 			 return params;
325 		}
326 		return null;
327 	}
328 	
329 	/* (non-Javadoc)
330 	 * @see java.lang.Runnable#run()
331 	 */
332 	@Override
333 	public void run() {
334 		while (this.isRunning) {
335 			try {
336 				if (this.asyncProcessingQueue.isEmpty()) {
337 					Thread.sleep(5000);
338 				} else {
339 					this.processAsyncMessage(this.asyncProcessingQueue.poll());
340 				}
341 			} catch (InterruptedException e) {
342 				this.isRunning = false;
343 			}
344 		}
345 	}
346 	
347 	/**
348 	 * Read file.
349 	 *
350 	 * @param fname the fname
351 	 * @return the string
352 	 * @throws IOException Signals that an I/O exception has occurred.
353 	 */
354 	private String readFile(String fname) throws IOException {
355 
356 	    InputStream tis = this.getClass().getResourceAsStream("/messages/"+fname);
357 	    StringBuilder fileContents = new StringBuilder();
358 	    Scanner scanner = new Scanner(tis);
359 	    String lineSeparator = System.getProperty("line.separator");
360 
361 	    try {
362 	        while(scanner.hasNextLine()) {        
363 	            fileContents.append(scanner.nextLine() + lineSeparator);
364 	        }
365 	        return fileContents.toString();
366 	    } finally {
367 	        scanner.close();
368 	    }
369 	}
370 
371 }