summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java3
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/Messenger.java64
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java9
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java9
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java2
5 files changed, 60 insertions, 27 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index cd07a4b1f67..113d99f77f9 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -120,8 +120,7 @@ public final class SessionCache extends AbstractComponent {
RPCNetworkParams netParams = new RPCNetworkParams()
.setSlobrokConfigId(slobrokConfigId)
.setIdentity(new Identity(identity))
- .setListenPort(mbusConfig.port())
- .setSendInOwnThread(true);
+ .setListenPort(mbusConfig.port());
return SharedMessageBus.newInstance(mbusParams, netParams);
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
index 7211e4cead0..63e5dbb2d04 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.log.LogLevel;
import java.util.ArrayDeque;
@@ -8,6 +9,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -23,6 +27,7 @@ public class Messenger implements Runnable {
private static final Logger log = Logger.getLogger(Messenger.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final List<Task> children = new ArrayList<>();
+ private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send"));
private final Queue<Task> queue = new ArrayDeque<>();
private final Thread thread = new Thread(this, "Messenger");
@@ -64,8 +69,13 @@ public class Messenger implements Runnable {
public void deliverMessage(final Message msg, final MessageHandler handler) {
if (destroyed.get()) {
msg.discard();
- } else {
- handler.handleMessage(msg);
+ return;
+ }
+ try {
+ sendExecutor.execute(new MessageTask(msg, handler));
+ } catch (RejectedExecutionException e) {
+ msg.discard();
+ log.warning("Execution rejected " + e.getMessage());
}
}
@@ -78,11 +88,7 @@ public class Messenger implements Runnable {
* @param handler The handler to return to.
*/
public void deliverReply(final Reply reply, final ReplyHandler handler) {
- if (destroyed.get()) {
- reply.discard();
- } else {
- handler.handleReply(reply);
- }
+ enqueue(new ReplyTask(reply, handler));
}
/**
@@ -130,6 +136,7 @@ public class Messenger implements Runnable {
boolean done = false;
enqueue(Terminate.INSTANCE);
if (!destroyed.getAndSet(true)) {
+ sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();});
try {
synchronized (this) {
while (!queue.isEmpty()) {
@@ -212,6 +219,49 @@ public class Messenger implements Runnable {
void destroy();
}
+ private static class MessageTask implements Runnable {
+
+ final MessageHandler handler;
+ Message msg;
+
+ MessageTask(final Message msg, final MessageHandler handler) {
+ this.msg = msg;
+ this.handler = handler;
+ }
+
+ @Override
+ public void run() {
+ final Message msg = this.msg;
+ this.msg = null;
+ handler.handleMessage(msg);
+ }
+ }
+
+ private static class ReplyTask implements Task {
+
+ final ReplyHandler handler;
+ Reply reply;
+
+ ReplyTask(final Reply reply, final ReplyHandler handler) {
+ this.reply = reply;
+ this.handler = handler;
+ }
+
+ @Override
+ public void run() {
+ final Reply reply = this.reply;
+ this.reply = null;
+ handler.handleReply(reply);
+ }
+
+ @Override
+ public void destroy() {
+ if (reply != null) {
+ reply.discard();
+ }
+ }
+ }
+
private static class SyncTask implements Task {
final CountDownLatch latch = new CountDownLatch(1);
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index ac28f0247a2..fdd72c12532 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -56,7 +56,6 @@ public class RPCNetwork implements Network, MethodHandler {
private static final Logger log = Logger.getLogger(RPCNetwork.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final Identity identity;
- private final boolean useOwnThread;
private final Supervisor orb;
private final RPCTargetPool targetPool;
private final RPCServicePool servicePool;
@@ -84,7 +83,6 @@ public class RPCNetwork implements Network, MethodHandler {
public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) {
this.slobroksConfig = slobrokConfig;
identity = params.getIdentity();
- useOwnThread = params.getSendInOwnThread();
orb = new Supervisor(new Transport());
orb.setMaxInputBufferSize(params.getMaxInputBufferSize());
orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize());
@@ -259,12 +257,7 @@ public class RPCNetwork implements Network, MethodHandler {
String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.",
buildRecipientListString(ctx), identity.getHostname()));
} else {
- SendTask sendTask = new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx);
- if ( ! useOwnThread ) {
- executor.execute(sendTask);
- } else {
- sendTask.run();
- }
+ executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
index ff3336ac37e..e50670530d3 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
@@ -15,7 +15,6 @@ public class RPCNetworkParams {
private Identity identity = new Identity("");
private String slobrokConfigId = "admin/slobrok.0";
private SlobroksConfig slobroksConfig = null;
- private boolean sendInOwnThread = false;
private int listenPort = 0;
private int maxInputBufferSize = 256 * 1024;
private int maxOutputBufferSize = 256 * 1024;
@@ -35,7 +34,6 @@ public class RPCNetworkParams {
*/
public RPCNetworkParams(RPCNetworkParams params) {
identity = new Identity(params.identity);
- sendInOwnThread = params.sendInOwnThread;
slobrokConfigId = params.slobrokConfigId;
slobroksConfig = params.slobroksConfig;
listenPort = params.listenPort;
@@ -64,13 +62,6 @@ public class RPCNetworkParams {
return this;
}
- public boolean getSendInOwnThread() { return sendInOwnThread; }
-
- public RPCNetworkParams setSendInOwnThread(boolean sendInOwnThread) {
- this.sendInOwnThread = sendInOwnThread;
- return this;
- }
-
/**
* Returns the config id of the slobrok config.
*
diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
index 3f0d167a12a..b1fd8e8cffa 100755
--- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
+++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
@@ -71,7 +71,7 @@ public class DummyReceiver implements MessageHandler {
private void init() {
MessageBusParams params = new MessageBusParams(new LoadTypeSet());
- params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)).setSendInOwnThread(true));
+ params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)));
params.setDocumentManagerConfigId("client");
params.getMessageBusParams().setMaxPendingCount(0);
params.getMessageBusParams().setMaxPendingSize(0);