1
2
3
4
5
6
7
8
9
10
11
12
13
14 package uk.nhs.interoperability.source;
15
16 import java.io.IOException;
17
18 import javax.xml.bind.DatatypeConverter;
19 import javax.xml.parsers.ParserConfigurationException;
20 import javax.xml.xpath.XPathConstants;
21 import javax.xml.xpath.XPathExpressionException;
22
23 import org.w3c.dom.Document;
24 import org.w3c.dom.Node;
25 import org.xml.sax.SAXException;
26
27 import uk.nhs.interoperability.capabilities.DirectoryOfServices;
28 import uk.nhs.interoperability.infrastructure.ITKAddress;
29 import uk.nhs.interoperability.infrastructure.ITKCommsException;
30 import uk.nhs.interoperability.infrastructure.ITKIdentityImpl;
31 import uk.nhs.interoperability.infrastructure.ITKMessageProperties;
32 import uk.nhs.interoperability.infrastructure.ITKMessagePropertiesImpl;
33 import uk.nhs.interoperability.infrastructure.ITKMessagingException;
34 import uk.nhs.interoperability.payload.DEWrappedMessage;
35 import uk.nhs.interoperability.payload.ITKMessage;
36 import uk.nhs.interoperability.payload.SimpleMessage;
37 import uk.nhs.interoperability.service.ITKService;
38 import uk.nhs.interoperability.service.ITKSimpleDOS;
39 import uk.nhs.interoperability.transport.ITKSender;
40 import uk.nhs.interoperability.transport.ITKTransportRoute;
41 import uk.nhs.interoperability.transport.WS.ITKSenderWSImpl;
42 import uk.nhs.interoperability.util.Logger;
43 import uk.nhs.interoperability.util.xml.DomUtils;
44 import uk.nhs.interoperability.util.xml.XPaths;
45
46
47
48
49
50
51
52
53 public class ITKMessageSenderImpl implements ITKMessageSender {
54
55
56 private DirectoryOfServices directoryOfServices;
57
58
59
60
61 public ITKMessageSenderImpl() {
62 this.directoryOfServices = new ITKSimpleDOS();
63 }
64
65
66
67
68
69 @Override
70 public void send(ITKMessage request) throws ITKMessagingException {
71
72
73 String serviceId = request.getMessageProperties().getServiceId();
74 ITKService service = directoryOfServices.getService(serviceId);
75 if (service==null){
76 Logger.trace("Invalid Service");
77 throw new ITKMessagingException("Service " + serviceId + " is not a configured service (see service.properties).");
78 }
79
80 ITKTransportRoute route = getRoute(request);
81
82 ITKSender sender = getSender(route);
83
84 ITKMessage message = buildMessage(request, route, service);
85
86 Logger.trace("Sending via WS: " + message.getFullPayload());
87
88 sender.send(route, message);
89
90 }
91
92
93
94
95 @Override
96 public ITKMessage sendSync(ITKMessage request) throws ITKMessagingException {
97
98
99 String serviceId = request.getMessageProperties().getServiceId();
100 ITKService service = directoryOfServices.getService(serviceId);
101 if (service==null){
102 Logger.trace("Invalid Service");
103 throw new ITKMessagingException("Service " + serviceId + " is not a configured service (see service.properties).");
104 }
105
106 if (!service.supportsSync()){
107 Logger.trace("Invalid Service Call");
108 throw new ITKMessagingException("Service " + service.getServiceId() + " can not be called Synchronously");
109 }
110
111 ITKTransportRoute route = getRoute(request);
112
113 ITKSender sender = getSender(route);
114
115 ITKMessage message = buildMessage(request, route, service);
116
117 Logger.trace("Sending via configured transport: " + message.getFullPayload());
118 ITKMessage response = sender.sendSync(route, message);
119
120 ITKMessage simpleResponse = buildResponse(response);
121
122 return simpleResponse;
123
124 }
125
126
127
128
129 @Override
130 public void sendAsync(ITKMessage request) throws ITKMessagingException {
131
132
133 String serviceId = request.getMessageProperties().getServiceId();
134 ITKService service = directoryOfServices.getService(serviceId);
135 if (service==null){
136 Logger.trace("Invalid Service");
137 throw new ITKMessagingException("Service " + serviceId + " is not a configured service (see service.properties).");
138 }
139
140 if (!service.supportsAsync()){
141 Logger.trace("Invalid Service Call");
142 throw new ITKMessagingException("Service " + service.getServiceId() + " can not be called Asynchronously");
143 }
144
145 ITKTransportRoute route = getRoute(request);
146
147 ITKSender sender = getSender(route);
148
149 ITKMessage message = buildMessage(request, route, service);
150
151 Logger.trace("Sending via configured transport: " + message.getFullPayload());
152 sender.sendAysnc(route, message);
153
154 }
155
156
157
158
159
160
161
162
163 private ITKTransportRoute getRoute(ITKMessage message) throws ITKMessagingException {
164
165 String serviceId = message.getMessageProperties().getServiceId();
166
167
168 ITKTransportRoute route = message.getPreresolvedRoute();
169 ITKAddress toAddress = message.getMessageProperties().getToAddress();
170
171
172
173
174 if (route == null) {
175
176 Logger.trace("TransportRoute has not been pre-resolved - looking up");
177
178
179 route = directoryOfServices.resolveDestination(serviceId, toAddress);
180
181 } else {
182
183 Logger.debug("Pre-resolved transport route on object " + message.getClass().getSimpleName() + " is " + route);
184
185 }
186
187 if (route == null){
188 String msgStr = "No route found for service (" + serviceId + ") and recipient ("+ toAddress + ")";
189 Logger.trace(msgStr);
190 throw new ITKMessagingException(msgStr);
191 } else {
192
193 Logger.debug("Found transport route:" + route);
194
195 }
196
197
198 return route;
199 }
200
201
202
203
204
205
206
207
208
209 private ITKMessage buildMessage(ITKMessage request, ITKTransportRoute route, ITKService service){
210 ITKMessage message = null;
211
212
213
214 if (route.getWrapperType().equals(ITKTransportRoute.DISTRIBUTION_ENVELOPE)){
215 Logger.trace("Adding distribution envelope wrapper");
216
217 if (service.isBase64()) {
218 String b64DocText = DatatypeConverter.printBase64Binary(request.getBusinessPayload().getBytes());
219 request.setBusinessPayload(b64DocText);
220 }
221 message = new DEWrappedMessage(service, request, DEWrappedMessage.DE_ADDRESSED);
222 } else {
223 Logger.trace("No DE wrapper required");
224 message = request;
225 }
226 return message;
227 }
228
229
230
231
232
233
234
235
236 private ITKSender getSender(ITKTransportRoute route) throws ITKMessagingException {
237
238 ITKSender sender = null;
239
240 if (route.getTransportType().equals(ITKTransportRoute.HTTP_WS)){
241 sender = new ITKSenderWSImpl();
242 }
243
244 if (sender == null){
245 Logger.trace("No Transport Sender Found");
246 throw new ITKMessagingException("No transport implementation found for configured route");
247 }
248
249 return sender;
250 }
251
252
253
254
255
256
257
258
259
260 private ITKMessage buildResponse(ITKMessage response) throws ITKMessagingException {
261
262 ITKMessage simpleResponse = new SimpleMessage();
263 ITKMessageProperties itkMessageProperties = null;
264
265 try {
266
267 String responsePayloadString = "";
268 Document responseDoc = DomUtils.parse(response.getBusinessPayload());
269 String responsePayloadName = responseDoc.getDocumentElement().getLocalName();
270 Logger.trace("Received:"+responsePayloadName);
271
272 if (responsePayloadName.equalsIgnoreCase("DistributionEnvelope")){
273 Logger.trace("Processing DistributionEnvelope");
274
275
276 itkMessageProperties = ITKMessagePropertiesImpl.build(responseDoc);
277
278
279 String mimetype = (String)XPaths.ITK_FIRST_MIMETYPE_XPATH.evaluate(responseDoc);
280 Logger.debug("MimeType: " + mimetype);
281
282 if (mimetype.equalsIgnoreCase("text/plain")){
283
284 String payload = (String)XPaths.ITK_FIRST_PAYLOAD_TEXT_XPATH.evaluate(responseDoc);
285 responsePayloadString = payload;
286
287
288 String base64 = (String)XPaths.ITK_FIRST_BASE64_XPATH.evaluate(responseDoc);
289 Logger.debug("Base64: " + base64);
290 if (base64.equalsIgnoreCase("true")){
291 byte[] payloadBytes = javax.xml.bind.DatatypeConverter.parseBase64Binary(payload);
292 responsePayloadString = new String(payloadBytes);
293 }
294
295 } else {
296
297 Document businessPayloadDocument = DomUtils.createDocumentFromNode((Node)XPaths.ITK_FIRST_PAYLOAD_XPATH.evaluate(responseDoc, XPathConstants.NODE));
298
299 if (businessPayloadDocument == null){
300
301 ITKMessagingException unknownResponseException = new ITKMessagingException(
302 ITKMessagingException.PROCESSING_ERROR_NOT_RETRYABLE_CODE, "No payload in Distribution Envelope");
303 Logger.error("Unknown response type",unknownResponseException);
304 throw unknownResponseException;
305 }
306
307
308 String businessPayloadName = businessPayloadDocument.getDocumentElement().getLocalName();
309 Logger.trace("Business Payload Name is:"+businessPayloadName);
310 responsePayloadString = DomUtils.serialiseToXML(businessPayloadDocument);
311 }
312
313 } else if (responsePayloadName.equalsIgnoreCase("SimpleMessageResponse")){
314 Logger.trace("Processing SimpleMessageResponse");
315
316 itkMessageProperties = new ITKMessagePropertiesImpl();
317 itkMessageProperties.setAuditIdentity(new ITKIdentityImpl("NOT SPECIFIED"));
318 responsePayloadString = (String)XPaths.ITK_SIMPLE_MESSAGE_RESPONSE_CONTENT_XPATH.evaluate(responseDoc);
319
320 } else {
321 Logger.trace("Processing UNKNOWN");
322 ITKMessagingException unknownResponseException = new ITKMessagingException(
323 ITKMessagingException.PROCESSING_ERROR_NOT_RETRYABLE_CODE, "Neither DistributionEnvelope nor SimpleMessageResponse Found");
324 Logger.error("Unknown response type",unknownResponseException);
325 throw unknownResponseException;
326
327 }
328
329 simpleResponse.setBusinessPayload(responsePayloadString);
330 simpleResponse.setMessageProperties(itkMessageProperties);
331
332 } catch (SAXException se) {
333 Logger.error("SAXException processing response from Transport", se);
334 throw new ITKCommsException("XML Error Processing ITK Response");
335 } catch (IOException ioe) {
336 Logger.error("IOException on Transport", ioe);
337 throw new ITKCommsException("Transport error sending ITK Message");
338 } catch (ParserConfigurationException pce) {
339 Logger.error("ParseConfigurationException on Transport", pce);
340 throw new ITKCommsException("XML Configuration Error Processing ITK Response");
341 } catch (XPathExpressionException xpe) {
342 Logger.error("XPathExpressionException reading payload on Transport Response", xpe);
343 throw new ITKCommsException("No Payload found in ITK Response");
344 }
345
346 return simpleResponse;
347 }
348
349 }