summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-14 09:07:04 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-14 09:07:04 +0200
commit85962336458e1a33aa5f0f0130dc018d4bee7d72 (patch)
tree53bd67071186bb84d2581f8cc34b9a9f41f0a3d4
parent4932df60a5ead7cdc58a236663a58afdd936fb1d (diff)
Add control for if you want to use mbus threas for compression/encoding or your own.
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java3
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java9
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java9
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java2
4 files changed, 20 insertions, 3 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 113d99f77f9..cd07a4b1f67 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -120,7 +120,8 @@ public final class SessionCache extends AbstractComponent {
RPCNetworkParams netParams = new RPCNetworkParams()
.setSlobrokConfigId(slobrokConfigId)
.setIdentity(new Identity(identity))
- .setListenPort(mbusConfig.port());
+ .setListenPort(mbusConfig.port())
+ .setSendInOwnThread(true);
return SharedMessageBus.newInstance(mbusParams, netParams);
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index fdd72c12532..ac28f0247a2 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -56,6 +56,7 @@ public class RPCNetwork implements Network, MethodHandler {
private static final Logger log = Logger.getLogger(RPCNetwork.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final Identity identity;
+ private final boolean useOwnThread;
private final Supervisor orb;
private final RPCTargetPool targetPool;
private final RPCServicePool servicePool;
@@ -83,6 +84,7 @@ public class RPCNetwork implements Network, MethodHandler {
public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) {
this.slobroksConfig = slobrokConfig;
identity = params.getIdentity();
+ useOwnThread = params.getSendInOwnThread();
orb = new Supervisor(new Transport());
orb.setMaxInputBufferSize(params.getMaxInputBufferSize());
orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize());
@@ -257,7 +259,12 @@ public class RPCNetwork implements Network, MethodHandler {
String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.",
buildRecipientListString(ctx), identity.getHostname()));
} else {
- executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
+ SendTask sendTask = new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx);
+ if ( ! useOwnThread ) {
+ executor.execute(sendTask);
+ } else {
+ sendTask.run();
+ }
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
index e50670530d3..ff3336ac37e 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
@@ -15,6 +15,7 @@ public class RPCNetworkParams {
private Identity identity = new Identity("");
private String slobrokConfigId = "admin/slobrok.0";
private SlobroksConfig slobroksConfig = null;
+ private boolean sendInOwnThread = false;
private int listenPort = 0;
private int maxInputBufferSize = 256 * 1024;
private int maxOutputBufferSize = 256 * 1024;
@@ -34,6 +35,7 @@ public class RPCNetworkParams {
*/
public RPCNetworkParams(RPCNetworkParams params) {
identity = new Identity(params.identity);
+ sendInOwnThread = params.sendInOwnThread;
slobrokConfigId = params.slobrokConfigId;
slobroksConfig = params.slobroksConfig;
listenPort = params.listenPort;
@@ -62,6 +64,13 @@ public class RPCNetworkParams {
return this;
}
+ public boolean getSendInOwnThread() { return sendInOwnThread; }
+
+ public RPCNetworkParams setSendInOwnThread(boolean sendInOwnThread) {
+ this.sendInOwnThread = sendInOwnThread;
+ return this;
+ }
+
/**
* Returns the config id of the slobrok config.
*
diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
index b1fd8e8cffa..3f0d167a12a 100755
--- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
+++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
@@ -71,7 +71,7 @@ public class DummyReceiver implements MessageHandler {
private void init() {
MessageBusParams params = new MessageBusParams(new LoadTypeSet());
- params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)));
+ params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)).setSendInOwnThread(true));
params.setDocumentManagerConfigId("client");
params.getMessageBusParams().setMaxPendingCount(0);
params.getMessageBusParams().setMaxPendingSize(0);