diff options
5 files changed, 27 insertions, 60 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 113d99f77f9..cd07a4b1f67 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -120,7 +120,8 @@ public final class SessionCache extends AbstractComponent { RPCNetworkParams netParams = new RPCNetworkParams() .setSlobrokConfigId(slobrokConfigId) .setIdentity(new Identity(identity)) - .setListenPort(mbusConfig.port()); + .setListenPort(mbusConfig.port()) + .setSendInOwnThread(true); return SharedMessageBus.newInstance(mbusParams, netParams); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 63e5dbb2d04..7211e4cead0 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -9,9 +8,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -27,7 +23,6 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); - private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); private final Thread thread = new Thread(this, "Messenger"); @@ -69,13 +64,8 @@ public class Messenger implements Runnable { public void deliverMessage(final Message msg, final MessageHandler handler) { if (destroyed.get()) { msg.discard(); - return; - } - try { - sendExecutor.execute(new MessageTask(msg, handler)); - } catch (RejectedExecutionException e) { - msg.discard(); - log.warning("Execution rejected " + e.getMessage()); + } else { + handler.handleMessage(msg); } } @@ -88,7 +78,11 @@ public class Messenger implements Runnable { * @param handler The handler to return to. */ public void deliverReply(final Reply reply, final ReplyHandler handler) { - enqueue(new ReplyTask(reply, handler)); + if (destroyed.get()) { + reply.discard(); + } else { + handler.handleReply(reply); + } } /** @@ -136,7 +130,6 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { - sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -219,49 +212,6 @@ public class Messenger implements Runnable { void destroy(); } - private static class MessageTask implements Runnable { - - final MessageHandler handler; - Message msg; - - MessageTask(final Message msg, final MessageHandler handler) { - this.msg = msg; - this.handler = handler; - } - - @Override - public void run() { - final Message msg = this.msg; - this.msg = null; - handler.handleMessage(msg); - } - } - - private static class ReplyTask implements Task { - - final ReplyHandler handler; - Reply reply; - - ReplyTask(final Reply reply, final ReplyHandler handler) { - this.reply = reply; - this.handler = handler; - } - - @Override - public void run() { - final Reply reply = this.reply; - this.reply = null; - handler.handleReply(reply); - } - - @Override - public void destroy() { - if (reply != null) { - reply.discard(); - } - } - } - private static class SyncTask implements Task { final CountDownLatch latch = new CountDownLatch(1); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index fdd72c12532..ac28f0247a2 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -56,6 +56,7 @@ public class RPCNetwork implements Network, MethodHandler { private static final Logger log = Logger.getLogger(RPCNetwork.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final Identity identity; + private final boolean useOwnThread; private final Supervisor orb; private final RPCTargetPool targetPool; private final RPCServicePool servicePool; @@ -83,6 +84,7 @@ public class RPCNetwork implements Network, MethodHandler { public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) { this.slobroksConfig = slobrokConfig; identity = params.getIdentity(); + useOwnThread = params.getSendInOwnThread(); orb = new Supervisor(new Transport()); orb.setMaxInputBufferSize(params.getMaxInputBufferSize()); orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize()); @@ -257,7 +259,12 @@ public class RPCNetwork implements Network, MethodHandler { String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", buildRecipientListString(ctx), identity.getHostname())); } else { - executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); + SendTask sendTask = new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx); + if ( ! useOwnThread ) { + executor.execute(sendTask); + } else { + sendTask.run(); + } } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index e50670530d3..ff3336ac37e 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -15,6 +15,7 @@ public class RPCNetworkParams { private Identity identity = new Identity(""); private String slobrokConfigId = "admin/slobrok.0"; private SlobroksConfig slobroksConfig = null; + private boolean sendInOwnThread = false; private int listenPort = 0; private int maxInputBufferSize = 256 * 1024; private int maxOutputBufferSize = 256 * 1024; @@ -34,6 +35,7 @@ public class RPCNetworkParams { */ public RPCNetworkParams(RPCNetworkParams params) { identity = new Identity(params.identity); + sendInOwnThread = params.sendInOwnThread; slobrokConfigId = params.slobrokConfigId; slobroksConfig = params.slobroksConfig; listenPort = params.listenPort; @@ -62,6 +64,13 @@ public class RPCNetworkParams { return this; } + public boolean getSendInOwnThread() { return sendInOwnThread; } + + public RPCNetworkParams setSendInOwnThread(boolean sendInOwnThread) { + this.sendInOwnThread = sendInOwnThread; + return this; + } + /** * Returns the config id of the slobrok config. * diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java index b1fd8e8cffa..3f0d167a12a 100755 --- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java +++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java @@ -71,7 +71,7 @@ public class DummyReceiver implements MessageHandler { private void init() { MessageBusParams params = new MessageBusParams(new LoadTypeSet()); - params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name))); + params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)).setSendInOwnThread(true)); params.setDocumentManagerConfigId("client"); params.getMessageBusParams().setMaxPendingCount(0); params.getMessageBusParams().setMaxPendingSize(0); |