summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java127
1 files changed, 127 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
new file mode 100644
index 00000000000..2a4767bc389
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
@@ -0,0 +1,127 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Target;
+import com.yahoo.jrt.Transport;
+import com.yahoo.jrt.Values;
+import com.yahoo.prelude.fastsearch.FastHit;
+
+import java.util.List;
+
+/**
+ * A client which uses rpc request to search nodes to implement the Client API.
+ *
+ * @author bratseth
+ */
+class RpcClient implements Client {
+
+ private final Supervisor supervisor = new Supervisor(new Transport());
+
+ @Override
+ public NodeConnection createConnection(String hostname, int port) {
+ return new RpcNodeConnection(hostname, port, supervisor);
+ }
+
+ @Override
+ public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength,
+ byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request("proton.getDocsums");
+ request.parameters().add(new Int8Value(compression.getCode()));
+ request.parameters().add(new Int32Value(uncompressedLength));
+ request.parameters().add(new DataValue(compressedSlime));
+
+ request.setContext(hits);
+ RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
+ rpcNode.invokeAsync(request, timeoutSeconds, new RpcResponseWaiter(rpcNode, responseReceiver));
+ }
+
+ private static class RpcNodeConnection implements NodeConnection {
+
+ // Information about the connected node
+ private final Supervisor supervisor;
+ private final String hostname;
+ private final int port;
+ private final String description;
+
+ // The current shared connection. This will be recycled when it becomes invalid.
+ // All access to this must be synchronized
+ private Target target = null;
+
+ public RpcNodeConnection(String hostname, int port, Supervisor supervisor) {
+ this.supervisor = supervisor;
+ this.hostname = hostname;
+ this.port = port;
+ description = "rpc node connection to " + hostname + ":" + port;
+ }
+
+ public void invokeAsync(Request req, double timeout, RequestWaiter waiter) {
+ // TODO: Consider replacing this by a watcher on the target
+ synchronized(this) { // ensure we have exactly 1 valid connection across threads
+ if (target == null || ! target.isValid())
+ target = supervisor.connect(new Spec(hostname, port));
+ }
+ target.invokeAsync(req, timeout, waiter);
+ }
+
+ @Override
+ public void close() {
+ target.close();
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ }
+
+ private static class RpcResponseWaiter implements RequestWaiter {
+
+ /** The node to which we made the request we are waiting for - for error messages only */
+ private final RpcNodeConnection node;
+
+ /** The handler to which the response is forwarded */
+ private final RpcFillInvoker.GetDocsumsResponseReceiver handler;
+
+ public RpcResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) {
+ this.node = node;
+ this.handler = handler;
+ }
+
+ @Override
+ public void handleRequestDone(Request requestWithResponse) {
+ if (requestWithResponse.isError()) {
+ handler.receive(GetDocsumsResponseOrError.fromError("Error response from " + node + ": " +
+ requestWithResponse.errorMessage()));
+ return;
+ }
+
+ Values returnValues = requestWithResponse.returnValues();
+ if (returnValues.size() < 3) {
+ handler.receive(GetDocsumsResponseOrError.fromError("Invalid getDocsums response from " + node +
+ ": Expected 3 return arguments, got " +
+ returnValues.size()));
+ return;
+ }
+
+ byte compression = returnValues.get(0).asInt8();
+ int uncompressedSize = returnValues.get(1).asInt32();
+ byte[] compressedSlimeBytes = returnValues.get(2).asData();
+ List<FastHit> hits = (List<FastHit>) requestWithResponse.getContext();
+ handler.receive(GetDocsumsResponseOrError.fromResponse(new GetDocsumsResponse(compression,
+ uncompressedSize,
+ compressedSlimeBytes,
+ hits)));
+ }
+
+ }
+
+}