diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-15 19:12:48 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-15 19:12:48 +0100 |
commit | d01f32a695788b18c891aad8d462dc261cc5c260 (patch) | |
tree | 0239a7c2233aac13e726683f8201fecace1a3026 /messagebus | |
parent | 91e45c35082e2fe847863d8e942fb1c40d970bf7 (diff) |
Better to just use a single thread.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 38fc9bb3e5b..ae95a52bc38 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -15,9 +15,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; @@ -70,7 +67,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, private int maxPendingSize = 0; private int pendingCount = 0; private int pendingSize = 0; - private final ScheduledExecutorService trySendExecutor = new ScheduledThreadPoolExecutor(1); + private final Thread careTaker = new Thread(this::sendBlockedMessages); private final ConcurrentHashMap<SendBlockedMessages, Long> blockedSenders = new ConcurrentHashMap<>(); private MessageBusMetricSet metrics = new MessageBusMetricSet(); @@ -88,9 +85,17 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } private void sendBlockedMessages() { - for (SendBlockedMessages sender : blockedSenders.keySet()) { - if ( ! sender.trySend() ) { - blockedSenders.remove(sender); + while (! destroyed.get()) { + for (SendBlockedMessages sender : blockedSenders.keySet()) { + if (!sender.trySend()) { + blockedSenders.remove(sender); + } + } + try { + + Thread.sleep(10); + } catch (InterruptedException e) { + return; } } } @@ -143,7 +148,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } else { resender = null; } - trySendExecutor.scheduleWithFixedDelay(this::sendBlockedMessages, 0, 10, TimeUnit.MILLISECONDS); + careTaker.setDaemon(true); + careTaker.start(); msn.start(); } @@ -178,6 +184,9 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, */ public boolean destroy() { if (!destroyed.getAndSet(true)) { + try { + careTaker.join(); + } catch (InterruptedException e) { } protocolRepository.clearPolicyCache(); net.shutdown(); msn.destroy(); |