diff options
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java | 10 | ||||
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java | 8 |
2 files changed, 15 insertions, 3 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index 96a0a5fecef..bb0b7bdd878 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -67,7 +67,7 @@ public class RPCNetwork implements Network, MethodHandler { private NetworkOwner owner; private final SlobrokConfigSubscriber slobroksConfig; private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true); - private final ExecutorService sendService = + private final ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.SECONDS, new SynchronousQueue<>(false), @@ -258,7 +258,7 @@ public class RPCNetwork implements Network, MethodHandler { } else if (ctx.hasError) { replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version."); } else { - sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); + executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); } } @@ -278,7 +278,7 @@ public class RPCNetwork implements Network, MethodHandler { listener.shutdown().join(); orb.transport().shutdown().join(); targetPool.flushTargets(true); - sendService.shutdown(); + executor.shutdown(); return true; } return false; @@ -411,6 +411,10 @@ public class RPCNetwork implements Network, MethodHandler { return oosManager; } + ExecutorService getExecutor() { + return executor; + } + private class SendTask implements Runnable { final Protocol protocol; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java index d7b4887bd36..daa31ae2701 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java @@ -107,6 +107,10 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai @Override public final void handleRequestDone(Request req) { + net.getExecutor().execute(() -> doRequestDone(req)); + } + + private void doRequestDone(Request req) { SendContext ctx = (SendContext)req.getContext(); String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName(); Reply reply = null; @@ -157,6 +161,10 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai @Override public final void invoke(Request request) { request.detach(); + net.getExecutor().execute(() -> doInvoke(request)); + } + + private void doInvoke(Request request) { Params p = toParams(request.parameters()); request.discardParameters(); // allow garbage collection of request parameters |