diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java | 50 |
1 files changed, 24 insertions, 26 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 2aa01b05955..7e48733106a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -29,31 +29,6 @@ class RpcClient implements Client { return new RpcNodeConnection(hostname, port, supervisor); } - @Override - public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request("proton.getDocsums"); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedSlime)); - - request.setContext(hits); - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver)); - } - - @Override - public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - ResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request(rpcMethod); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedPayload)); - - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver)); - } - private static class RpcNodeConnection implements NodeConnection { // Information about the connected node @@ -73,7 +48,30 @@ class RpcClient implements Client { description = "rpc node connection to " + hostname + ":" + port; } - public void invokeAsync(Request req, double timeout, RequestWaiter waiter) { + @Override + public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, + byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request("proton.getDocsums"); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedSlime)); + + request.setContext(hits); + invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver)); + } + + @Override + public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request(rpcMethod); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedPayload)); + + invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(this, responseReceiver)); + } + + private void invokeAsync(Request req, double timeout, RequestWaiter waiter) { // TODO: Consider replacing this by a watcher on the target synchronized(this) { // ensure we have exactly 1 valid connection across threads if (target == null || ! target.isValid()) |