summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-26 19:15:31 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-09-26 19:15:31 +0200
commitbcedec03b38d63db550338b7726da242a6bad949 (patch)
tree2c3d828bfabf6fc1e243157c2cb6bc886291fa3b /messagebus
parent6796195b37f351f843eea4992d2d45b7ba4eb771 (diff)
Use the threadpool for handling incomming requests and replies.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java10
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java8
2 files changed, 15 insertions, 3 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 96a0a5fecef..bb0b7bdd878 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -67,7 +67,7 @@ public class RPCNetwork implements Network, MethodHandler {
private NetworkOwner owner;
private final SlobrokConfigSubscriber slobroksConfig;
private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true);
- private final ExecutorService sendService =
+ private final ExecutorService executor =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(false),
@@ -258,7 +258,7 @@ public class RPCNetwork implements Network, MethodHandler {
} else if (ctx.hasError) {
replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version.");
} else {
- sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
+ executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
}
@@ -278,7 +278,7 @@ public class RPCNetwork implements Network, MethodHandler {
listener.shutdown().join();
orb.transport().shutdown().join();
targetPool.flushTargets(true);
- sendService.shutdown();
+ executor.shutdown();
return true;
}
return false;
@@ -411,6 +411,10 @@ public class RPCNetwork implements Network, MethodHandler {
return oosManager;
}
+ ExecutorService getExecutor() {
+ return executor;
+ }
+
private class SendTask implements Runnable {
final Protocol protocol;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
index d7b4887bd36..daa31ae2701 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -107,6 +107,10 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
@Override
public final void handleRequestDone(Request req) {
+ net.getExecutor().execute(() -> doRequestDone(req));
+ }
+
+ private void doRequestDone(Request req) {
SendContext ctx = (SendContext)req.getContext();
String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
Reply reply = null;
@@ -157,6 +161,10 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
@Override
public final void invoke(Request request) {
request.detach();
+ net.getExecutor().execute(() -> doInvoke(request));
+ }
+
+ private void doInvoke(Request request) {
Params p = toParams(request.parameters());
request.discardParameters(); // allow garbage collection of request parameters