1
2
3
4
5
6
7
8
9
10
11
12
13
14 package uk.nhs.interoperability.util;
15
16 import java.util.Queue;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.LinkedBlockingQueue;
20
21 import uk.nhs.interoperability.infrastructure.ITKMessagingException;
22 import uk.nhs.interoperability.payload.ITKMessage;
23 import uk.nhs.interoperability.source.ITKMessageSender;
24 import uk.nhs.interoperability.source.ITKMessageSenderImpl;
25
26
27
28
29
30
31
32
33 public class MessageQueue implements Runnable {
34
35
36 private ExecutorService executorService;
37
38
39 private boolean isRunning = true;
40
41
42 private ITKMessageSender itkMessageSender;
43
44
45
46
47
48 private Queue<ITKMessage> asyncProcessingQueue = new LinkedBlockingQueue<ITKMessage>();
49
50
51
52
53 public MessageQueue() {
54 this.executorService = Executors.newFixedThreadPool(1);
55 this.executorService.execute(this);
56 this.itkMessageSender = new ITKMessageSenderImpl();
57 }
58
59
60
61
62
63
64 public void queue(ITKMessage request) {
65
66 this.asyncProcessingQueue.add(request);
67 }
68
69
70
71
72
73
74 private void processAsyncMessage(ITKMessage request) {
75 Logger.debug("Processing queued itk request");
76 try {
77 this.itkMessageSender.send(request);
78 } catch (ITKMessagingException e) {
79
80
81
82
83
84
85 Logger.error("Could not send aysnchronous request", e);
86 }
87 }
88
89
90
91
92 @Override
93 public void run() {
94 while (this.isRunning) {
95 try {
96 if (this.asyncProcessingQueue.isEmpty()) {
97 Thread.sleep(5000);
98 } else {
99 this.processAsyncMessage(this.asyncProcessingQueue.poll());
100 }
101 } catch (InterruptedException e) {
102 this.isRunning = false;
103 }
104 }
105 }
106
107 }