summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-15 19:12:48 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-15 19:12:48 +0100
commitd01f32a695788b18c891aad8d462dc261cc5c260 (patch)
tree0239a7c2233aac13e726683f8201fecace1a3026 /messagebus
parent91e45c35082e2fe847863d8e942fb1c40d970bf7 (diff)
Better to just use a single thread.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java25
1 files changed, 17 insertions, 8 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 38fc9bb3e5b..ae95a52bc38 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -15,9 +15,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
@@ -70,7 +67,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
private int maxPendingSize = 0;
private int pendingCount = 0;
private int pendingSize = 0;
- private final ScheduledExecutorService trySendExecutor = new ScheduledThreadPoolExecutor(1);
+ private final Thread careTaker = new Thread(this::sendBlockedMessages);
private final ConcurrentHashMap<SendBlockedMessages, Long> blockedSenders = new ConcurrentHashMap<>();
private MessageBusMetricSet metrics = new MessageBusMetricSet();
@@ -88,9 +85,17 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
private void sendBlockedMessages() {
- for (SendBlockedMessages sender : blockedSenders.keySet()) {
- if ( ! sender.trySend() ) {
- blockedSenders.remove(sender);
+ while (! destroyed.get()) {
+ for (SendBlockedMessages sender : blockedSenders.keySet()) {
+ if (!sender.trySend()) {
+ blockedSenders.remove(sender);
+ }
+ }
+ try {
+
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ return;
}
}
}
@@ -143,7 +148,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
} else {
resender = null;
}
- trySendExecutor.scheduleWithFixedDelay(this::sendBlockedMessages, 0, 10, TimeUnit.MILLISECONDS);
+ careTaker.setDaemon(true);
+ careTaker.start();
msn.start();
}
@@ -178,6 +184,9 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
*/
public boolean destroy() {
if (!destroyed.getAndSet(true)) {
+ try {
+ careTaker.join();
+ } catch (InterruptedException e) { }
protocolRepository.clearPolicyCache();
net.shutdown();
msn.destroy();