diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-14 19:54:23 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-14 19:54:23 +0200 |
commit | 1013b6c5e98b21bf50e968f1cd71e40091021435 (patch) | |
tree | e606fb153bb433be2bfc57a98ed1a214765f0094 /messagebus | |
parent | 7f3811147582723463eac67cc3416855f5138061 (diff) |
Revert "Balder/skip the messenger thread"
Diffstat (limited to 'messagebus')
3 files changed, 58 insertions, 24 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 7211e4cead0..63e5dbb2d04 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -8,6 +9,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -23,6 +27,7 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); + private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); private final Thread thread = new Thread(this, "Messenger"); @@ -64,8 +69,13 @@ public class Messenger implements Runnable { public void deliverMessage(final Message msg, final MessageHandler handler) { if (destroyed.get()) { msg.discard(); - } else { - handler.handleMessage(msg); + return; + } + try { + sendExecutor.execute(new MessageTask(msg, handler)); + } catch (RejectedExecutionException e) { + msg.discard(); + log.warning("Execution rejected " + e.getMessage()); } } @@ -78,11 +88,7 @@ public class Messenger implements Runnable { * @param handler The handler to return to. */ public void deliverReply(final Reply reply, final ReplyHandler handler) { - if (destroyed.get()) { - reply.discard(); - } else { - handler.handleReply(reply); - } + enqueue(new ReplyTask(reply, handler)); } /** @@ -130,6 +136,7 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { + sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -212,6 +219,49 @@ public class Messenger implements Runnable { void destroy(); } + private static class MessageTask implements Runnable { + + final MessageHandler handler; + Message msg; + + MessageTask(final Message msg, final MessageHandler handler) { + this.msg = msg; + this.handler = handler; + } + + @Override + public void run() { + final Message msg = this.msg; + this.msg = null; + handler.handleMessage(msg); + } + } + + private static class ReplyTask implements Task { + + final ReplyHandler handler; + Reply reply; + + ReplyTask(final Reply reply, final ReplyHandler handler) { + this.reply = reply; + this.handler = handler; + } + + @Override + public void run() { + final Reply reply = this.reply; + this.reply = null; + handler.handleReply(reply); + } + + @Override + public void destroy() { + if (reply != null) { + reply.discard(); + } + } + } + private static class SyncTask implements Task { final CountDownLatch latch = new CountDownLatch(1); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index ac28f0247a2..fdd72c12532 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -56,7 +56,6 @@ public class RPCNetwork implements Network, MethodHandler { private static final Logger log = Logger.getLogger(RPCNetwork.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final Identity identity; - private final boolean useOwnThread; private final Supervisor orb; private final RPCTargetPool targetPool; private final RPCServicePool servicePool; @@ -84,7 +83,6 @@ public class RPCNetwork implements Network, MethodHandler { public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) { this.slobroksConfig = slobrokConfig; identity = params.getIdentity(); - useOwnThread = params.getSendInOwnThread(); orb = new Supervisor(new Transport()); orb.setMaxInputBufferSize(params.getMaxInputBufferSize()); orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize()); @@ -259,12 +257,7 @@ public class RPCNetwork implements Network, MethodHandler { String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", buildRecipientListString(ctx), identity.getHostname())); } else { - SendTask sendTask = new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx); - if ( ! useOwnThread ) { - executor.execute(sendTask); - } else { - sendTask.run(); - } + executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index ff3336ac37e..e50670530d3 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -15,7 +15,6 @@ public class RPCNetworkParams { private Identity identity = new Identity(""); private String slobrokConfigId = "admin/slobrok.0"; private SlobroksConfig slobroksConfig = null; - private boolean sendInOwnThread = false; private int listenPort = 0; private int maxInputBufferSize = 256 * 1024; private int maxOutputBufferSize = 256 * 1024; @@ -35,7 +34,6 @@ public class RPCNetworkParams { */ public RPCNetworkParams(RPCNetworkParams params) { identity = new Identity(params.identity); - sendInOwnThread = params.sendInOwnThread; slobrokConfigId = params.slobrokConfigId; slobroksConfig = params.slobroksConfig; listenPort = params.listenPort; @@ -64,13 +62,6 @@ public class RPCNetworkParams { return this; } - public boolean getSendInOwnThread() { return sendInOwnThread; } - - public RPCNetworkParams setSendInOwnThread(boolean sendInOwnThread) { - this.sendInOwnThread = sendInOwnThread; - return this; - } - /** * Returns the config id of the slobrok config. * |