summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-15 17:44:17 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-15 17:44:17 +0100
commit91e45c35082e2fe847863d8e942fb1c40d970bf7 (patch)
tree8dc70a033b8f246906f49306168b1f6ac60a6089 /messagebus
parent1c26fb4de1f04337ac95dbcefb83c06d1e7327e6 (diff)
Use a single thread for sending blocked messages.
Not necessary to your own.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java29
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java16
2 files changed, 40 insertions, 5 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 91d3ba966c3..38fc9bb3e5b 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -2,6 +2,7 @@
package com.yahoo.messagebus;
import com.yahoo.concurrent.CopyOnWriteHashMap;
+import com.yahoo.concurrent.SystemTimer;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.metrics.MessageBusMetricSet;
import com.yahoo.messagebus.network.Network;
@@ -13,6 +14,10 @@ import com.yahoo.text.Utf8String;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
@@ -65,8 +70,31 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
private int maxPendingSize = 0;
private int pendingCount = 0;
private int pendingSize = 0;
+ private final ScheduledExecutorService trySendExecutor = new ScheduledThreadPoolExecutor(1);
+ private final ConcurrentHashMap<SendBlockedMessages, Long> blockedSenders = new ConcurrentHashMap<>();
private MessageBusMetricSet metrics = new MessageBusMetricSet();
+ public interface SendBlockedMessages {
+ /**
+ * Do what you want, but dont block.
+ * You will be called regularly until you signal you are done
+ * @return true unless you are done
+ */
+ boolean trySend();
+ }
+
+ public void register(SendBlockedMessages sender) {
+ blockedSenders.put(sender, SystemTimer.INSTANCE.milliTime());
+ }
+
+ private void sendBlockedMessages() {
+ for (SendBlockedMessages sender : blockedSenders.keySet()) {
+ if ( ! sender.trySend() ) {
+ blockedSenders.remove(sender);
+ }
+ }
+ }
+
/**
* <p>Convenience constructor that proxies {@link #MessageBus(Network,
* MessageBusParams)} by adding the given protocols to a default {@link
@@ -115,6 +143,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
} else {
resender = null;
}
+ trySendExecutor.scheduleWithFixedDelay(this::sendBlockedMessages, 0, 10, TimeUnit.MILLISECONDS);
msn.start();
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index faf8514df93..0e8703b8a58 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -8,6 +8,7 @@ import com.yahoo.text.Utf8String;
import java.util.*;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -16,7 +17,7 @@ import java.util.logging.Logger;
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public final class SourceSession implements ReplyHandler {
+public final class SourceSession implements ReplyHandler, MessageBus.SendBlockedMessages {
private static Logger log = Logger.getLogger(SourceSession.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -30,7 +31,6 @@ public final class SourceSession implements ReplyHandler {
private volatile int pendingCount = 0;
private volatile boolean closed = false;
private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
- private final Thread blockedMessageSender;
/**
* <p>The default constructor requires values for all final member variables
@@ -51,9 +51,7 @@ public final class SourceSession implements ReplyHandler {
replyHandler = params.getReplyHandler();
throttlePolicy = params.getThrottlePolicy();
timeout = params.getTimeout();
- blockedMessageSender = new Thread(this::blockedSendLoop);
- blockedMessageSender.setDaemon(true);
- blockedMessageSender.start();
+ mbus.register(this);
}
@Override
@@ -181,6 +179,14 @@ public final class SourceSession implements ReplyHandler {
expireStalledBlockedMessages();
}
+ @Override
+ public boolean trySend() {
+ if (closed) return false;
+ sendBlockedMessages();
+ expireStalledBlockedMessages();
+ return true;
+ }
+
private class BlockedMessage {
private final Message msg;
private Result result = null;