aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-10-22 22:27:05 +0200
committerGitHub <noreply@github.com>2018-10-22 22:27:05 +0200
commitbc6eaee234cd383289cec4959f19ce91c08a6559 (patch)
tree2fb4665efb2f3d19d9f414a13f2fa5a167d1a22d
parent931b9fb8422b29e74319e91bd7b72d9c8c596871 (diff)
parent6e616650a840f03b518c53f85eec5211c72077f9 (diff)
Merge pull request #7385 from vespa-engine/balder/separate-send-reply-thread
To avoid replies being affected by send use 2 threads.
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/Messenger.java27
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java4
2 files changed, 20 insertions, 11 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
index 6ef5e9adca7..4fb83386231 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.log.LogLevel;
import java.util.ArrayDeque;
@@ -8,6 +9,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -23,7 +27,9 @@ public class Messenger implements Runnable {
private static final Logger log = Logger.getLogger(Messenger.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final List<Task> children = new ArrayList<>();
+ private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send"));
private final Queue<Task> queue = new ArrayDeque<>();
+
private final Thread thread = new Thread(this, "Messenger");
public Messenger() {
@@ -61,7 +67,16 @@ public class Messenger implements Runnable {
* @param handler The handler to send to.
*/
public void deliverMessage(final Message msg, final MessageHandler handler) {
- enqueue(new MessageTask(msg, handler));
+ if (destroyed.get()) {
+ msg.discard();
+ return;
+ }
+ try {
+ sendExecutor.execute(new MessageTask(msg, handler));
+ } catch (RejectedExecutionException e) {
+ msg.discard();
+ log.warning("Execution rejected " + e.getMessage());
+ }
}
/**
@@ -121,6 +136,7 @@ public class Messenger implements Runnable {
boolean done = false;
enqueue(Terminate.INSTANCE);
if (!destroyed.getAndSet(true)) {
+ sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();});
try {
synchronized (this) {
while (!queue.isEmpty()) {
@@ -203,7 +219,7 @@ public class Messenger implements Runnable {
public void destroy();
}
- private static class MessageTask implements Task {
+ private static class MessageTask implements Runnable {
final MessageHandler handler;
Message msg;
@@ -219,13 +235,6 @@ public class Messenger implements Runnable {
this.msg = null;
handler.handleMessage(msg);
}
-
- @Override
- public void destroy() {
- if (msg != null) {
- msg.discard();
- }
- }
}
private static class ReplyTask implements Task {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
index 0dcc1dd67fd..3049639586d 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
@@ -183,7 +183,7 @@ public class RoutingNode implements ReplyHandler {
}
/**
- * This method mergs this node as ready for merge. If it has a parent routing node, its pending member is
+ * This method markss this node as ready for merge. If it has a parent routing node, its pending member is
* decremented. If this causes the parent's pending count to reach zero, its {@link #notifyMerge()} method is
* invoked. A special flag is used to make sure that failed resending avoids notifying parents of previously
* resolved branches of the tree.
@@ -205,7 +205,7 @@ public class RoutingNode implements ReplyHandler {
/**
* This method merges the content of all its children, and invokes itself on the parent node. If not all children
- * are ready for merg, this method does nothing. The rationale for this is that the last child to receive a reply
+ * are ready for merge, this method does nothing. The rationale for this is that the last child to receive a reply
* will propagate the merge upwards. Once this method reaches the root node, the reply is either scheduled for
* resending or passed to the owning reply handler.
*/