diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-14 09:07:04 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-14 09:07:04 +0200 |
commit | 85962336458e1a33aa5f0f0130dc018d4bee7d72 (patch) | |
tree | 53bd67071186bb84d2581f8cc34b9a9f41f0a3d4 | |
parent | 4932df60a5ead7cdc58a236663a58afdd936fb1d (diff) |
Add control for if you want to use mbus threas for compression/encoding or your own.
4 files changed, 20 insertions, 3 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 113d99f77f9..cd07a4b1f67 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -120,7 +120,8 @@ public final class SessionCache extends AbstractComponent { RPCNetworkParams netParams = new RPCNetworkParams() .setSlobrokConfigId(slobrokConfigId) .setIdentity(new Identity(identity)) - .setListenPort(mbusConfig.port()); + .setListenPort(mbusConfig.port()) + .setSendInOwnThread(true); return SharedMessageBus.newInstance(mbusParams, netParams); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index fdd72c12532..ac28f0247a2 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -56,6 +56,7 @@ public class RPCNetwork implements Network, MethodHandler { private static final Logger log = Logger.getLogger(RPCNetwork.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final Identity identity; + private final boolean useOwnThread; private final Supervisor orb; private final RPCTargetPool targetPool; private final RPCServicePool servicePool; @@ -83,6 +84,7 @@ public class RPCNetwork implements Network, MethodHandler { public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) { this.slobroksConfig = slobrokConfig; identity = params.getIdentity(); + useOwnThread = params.getSendInOwnThread(); orb = new Supervisor(new Transport()); orb.setMaxInputBufferSize(params.getMaxInputBufferSize()); orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize()); @@ -257,7 +259,12 @@ public class RPCNetwork implements Network, MethodHandler { String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", buildRecipientListString(ctx), identity.getHostname())); } else { - executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); + SendTask sendTask = new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx); + if ( ! useOwnThread ) { + executor.execute(sendTask); + } else { + sendTask.run(); + } } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index e50670530d3..ff3336ac37e 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -15,6 +15,7 @@ public class RPCNetworkParams { private Identity identity = new Identity(""); private String slobrokConfigId = "admin/slobrok.0"; private SlobroksConfig slobroksConfig = null; + private boolean sendInOwnThread = false; private int listenPort = 0; private int maxInputBufferSize = 256 * 1024; private int maxOutputBufferSize = 256 * 1024; @@ -34,6 +35,7 @@ public class RPCNetworkParams { */ public RPCNetworkParams(RPCNetworkParams params) { identity = new Identity(params.identity); + sendInOwnThread = params.sendInOwnThread; slobrokConfigId = params.slobrokConfigId; slobroksConfig = params.slobroksConfig; listenPort = params.listenPort; @@ -62,6 +64,13 @@ public class RPCNetworkParams { return this; } + public boolean getSendInOwnThread() { return sendInOwnThread; } + + public RPCNetworkParams setSendInOwnThread(boolean sendInOwnThread) { + this.sendInOwnThread = sendInOwnThread; + return this; + } + /** * Returns the config id of the slobrok config. * diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java index b1fd8e8cffa..3f0d167a12a 100755 --- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java +++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java @@ -71,7 +71,7 @@ public class DummyReceiver implements MessageHandler { private void init() { MessageBusParams params = new MessageBusParams(new LoadTypeSet()); - params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name))); + params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)).setSendInOwnThread(true)); params.setDocumentManagerConfigId("client"); params.getMessageBusParams().setMaxPendingCount(0); params.getMessageBusParams().setMaxPendingSize(0); |