diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-22 22:27:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-22 22:27:05 +0200 |
commit | bc6eaee234cd383289cec4959f19ce91c08a6559 (patch) | |
tree | 2fb4665efb2f3d19d9f414a13f2fa5a167d1a22d | |
parent | 931b9fb8422b29e74319e91bd7b72d9c8c596871 (diff) | |
parent | 6e616650a840f03b518c53f85eec5211c72077f9 (diff) |
Merge pull request #7385 from vespa-engine/balder/separate-send-reply-thread
To avoid replies being affected by send use 2 threads.
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/Messenger.java | 27 | ||||
-rwxr-xr-x | messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java | 4 |
2 files changed, 20 insertions, 11 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 6ef5e9adca7..4fb83386231 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -8,6 +9,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -23,7 +27,9 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); + private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); + private final Thread thread = new Thread(this, "Messenger"); public Messenger() { @@ -61,7 +67,16 @@ public class Messenger implements Runnable { * @param handler The handler to send to. */ public void deliverMessage(final Message msg, final MessageHandler handler) { - enqueue(new MessageTask(msg, handler)); + if (destroyed.get()) { + msg.discard(); + return; + } + try { + sendExecutor.execute(new MessageTask(msg, handler)); + } catch (RejectedExecutionException e) { + msg.discard(); + log.warning("Execution rejected " + e.getMessage()); + } } /** @@ -121,6 +136,7 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { + sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -203,7 +219,7 @@ public class Messenger implements Runnable { public void destroy(); } - private static class MessageTask implements Task { + private static class MessageTask implements Runnable { final MessageHandler handler; Message msg; @@ -219,13 +235,6 @@ public class Messenger implements Runnable { this.msg = null; handler.handleMessage(msg); } - - @Override - public void destroy() { - if (msg != null) { - msg.discard(); - } - } } private static class ReplyTask implements Task { diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java index 0dcc1dd67fd..3049639586d 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java @@ -183,7 +183,7 @@ public class RoutingNode implements ReplyHandler { } /** - * This method mergs this node as ready for merge. If it has a parent routing node, its pending member is + * This method markss this node as ready for merge. If it has a parent routing node, its pending member is * decremented. If this causes the parent's pending count to reach zero, its {@link #notifyMerge()} method is * invoked. A special flag is used to make sure that failed resending avoids notifying parents of previously * resolved branches of the tree. @@ -205,7 +205,7 @@ public class RoutingNode implements ReplyHandler { /** * This method merges the content of all its children, and invokes itself on the parent node. If not all children - * are ready for merg, this method does nothing. The rationale for this is that the last child to receive a reply + * are ready for merge, this method does nothing. The rationale for this is that the last child to receive a reply * will propagate the merge upwards. Once this method reaches the root node, the reply is either scheduled for * resending or passed to the owning reply handler. */ |