diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-15 17:44:17 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-15 17:44:17 +0100 |
commit | 91e45c35082e2fe847863d8e942fb1c40d970bf7 (patch) | |
tree | 8dc70a033b8f246906f49306168b1f6ac60a6089 /messagebus | |
parent | 1c26fb4de1f04337ac95dbcefb83c06d1e7327e6 (diff) |
Use a single thread for sending blocked messages.
Not necessary to your own.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java | 29 | ||||
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 16 |
2 files changed, 40 insertions, 5 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 91d3ba966c3..38fc9bb3e5b 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -2,6 +2,7 @@ package com.yahoo.messagebus; import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.concurrent.SystemTimer; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.metrics.MessageBusMetricSet; import com.yahoo.messagebus.network.Network; @@ -13,6 +14,10 @@ import com.yahoo.text.Utf8String; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; @@ -65,8 +70,31 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, private int maxPendingSize = 0; private int pendingCount = 0; private int pendingSize = 0; + private final ScheduledExecutorService trySendExecutor = new ScheduledThreadPoolExecutor(1); + private final ConcurrentHashMap<SendBlockedMessages, Long> blockedSenders = new ConcurrentHashMap<>(); private MessageBusMetricSet metrics = new MessageBusMetricSet(); + public interface SendBlockedMessages { + /** + * Do what you want, but dont block. + * You will be called regularly until you signal you are done + * @return true unless you are done + */ + boolean trySend(); + } + + public void register(SendBlockedMessages sender) { + blockedSenders.put(sender, SystemTimer.INSTANCE.milliTime()); + } + + private void sendBlockedMessages() { + for (SendBlockedMessages sender : blockedSenders.keySet()) { + if ( ! sender.trySend() ) { + blockedSenders.remove(sender); + } + } + } + /** * <p>Convenience constructor that proxies {@link #MessageBus(Network, * MessageBusParams)} by adding the given protocols to a default {@link @@ -115,6 +143,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } else { resender = null; } + trySendExecutor.scheduleWithFixedDelay(this::sendBlockedMessages, 0, 10, TimeUnit.MILLISECONDS); msn.start(); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index faf8514df93..0e8703b8a58 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -8,6 +8,7 @@ import com.yahoo.text.Utf8String; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -16,7 +17,7 @@ import java.util.logging.Logger; * * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> */ -public final class SourceSession implements ReplyHandler { +public final class SourceSession implements ReplyHandler, MessageBus.SendBlockedMessages { private static Logger log = Logger.getLogger(SourceSession.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -30,7 +31,6 @@ public final class SourceSession implements ReplyHandler { private volatile int pendingCount = 0; private volatile boolean closed = false; private final Queue<BlockedMessage> blockedQ = new LinkedList<>(); - private final Thread blockedMessageSender; /** * <p>The default constructor requires values for all final member variables @@ -51,9 +51,7 @@ public final class SourceSession implements ReplyHandler { replyHandler = params.getReplyHandler(); throttlePolicy = params.getThrottlePolicy(); timeout = params.getTimeout(); - blockedMessageSender = new Thread(this::blockedSendLoop); - blockedMessageSender.setDaemon(true); - blockedMessageSender.start(); + mbus.register(this); } @Override @@ -181,6 +179,14 @@ public final class SourceSession implements ReplyHandler { expireStalledBlockedMessages(); } + @Override + public boolean trySend() { + if (closed) return false; + sendBlockedMessages(); + expireStalledBlockedMessages(); + return true; + } + private class BlockedMessage { private final Message msg; private Result result = null; |