summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java50
1 files changed, 24 insertions, 26 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 2aa01b05955..7e48733106a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -29,31 +29,6 @@ class RpcClient implements Client {
return new RpcNodeConnection(hostname, port, supervisor);
}
- @Override
- public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
- Request request = new Request("proton.getDocsums");
- request.parameters().add(new Int8Value(compression.getCode()));
- request.parameters().add(new Int32Value(uncompressedLength));
- request.parameters().add(new DataValue(compressedSlime));
-
- request.setContext(hits);
- RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver));
- }
-
- @Override
- public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
- ResponseReceiver responseReceiver, double timeoutSeconds) {
- Request request = new Request(rpcMethod);
- request.parameters().add(new Int8Value(compression.getCode()));
- request.parameters().add(new Int32Value(uncompressedLength));
- request.parameters().add(new DataValue(compressedPayload));
-
- RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver));
- }
-
private static class RpcNodeConnection implements NodeConnection {
// Information about the connected node
@@ -73,7 +48,30 @@ class RpcClient implements Client {
description = "rpc node connection to " + hostname + ":" + port;
}
- public void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
+ @Override
+ public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength,
+ byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request("proton.getDocsums");
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedSlime));
+
+ request.setContext(hits);
+ invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver));
+ }
+
+ @Override
+ public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request(rpcMethod);
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedPayload));
+
+ invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(this, responseReceiver));
+ }
+
+ private void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
// TODO: Consider replacing this by a watcher on the target
synchronized(this) { // ensure we have exactly 1 valid connection across threads
if (target == null || ! target.isValid())