summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2019-03-27 14:55:30 +0100
committerOlli Virtanen <olli.virtanen@oath.com>2019-03-27 14:55:30 +0100
commit2d68d291bacdcea237cb3f6c3e5f85aa61845b88 (patch)
tree8f45ae2022e0a812e18513398e242a0a121166dd /container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
parent5e33bb54604989bb2cef605572f7750d45eb630a (diff)
Retrieve document summaries over jrt/protobuf
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java29
1 files changed, 16 insertions, 13 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 88d77c760e3..f8a5ee2f8c1 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -9,7 +9,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.SearchInvoker;
-import com.yahoo.search.dispatch.rpc.Client.SearchResponse;
+import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.searchchain.Execution;
@@ -25,11 +25,13 @@ import java.util.concurrent.TimeUnit;
*
* @author ollivir
*/
-public class RpcSearchInvoker extends SearchInvoker {
+public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseReceiver {
+ private final String RPC_METHOD = "vespa.searchprotocol.search";
+
private final VespaBackEndSearcher searcher;
private final Node node;
private final RpcResourcePool resourcePool;
- private final BlockingQueue<Client.SearchResponseOrError> responses;
+ private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses;
private Query query;
@@ -50,15 +52,16 @@ public class RpcSearchInvoker extends SearchInvoker {
Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key());
if (nodeConnection == null) {
- responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key()));
+ responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key()));
responseAvailable();
return;
}
+ query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key());
- var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true);
+ var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId());
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
- resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this,
+ resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this,
timeoutSeconds);
}
@@ -68,7 +71,7 @@ public class RpcSearchInvoker extends SearchInvoker {
if (timeLeftMs <= 0) {
return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
}
- Client.SearchResponseOrError response = null;
+ Client.ResponseOrError<ProtobufResponse> response = null;
try {
response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -84,11 +87,11 @@ public class RpcSearchInvoker extends SearchInvoker {
return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available"));
}
- SearchResponse searchResponse = response.response().get();
- CompressionType compression = CompressionType.valueOf(searchResponse.compression());
- byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression,
- searchResponse.uncompressedSize());
- var result = ProtobufSerialization.deserializeToResult(payload, query, searcher);
+ ProtobufResponse protobufResponse = response.response().get();
+ CompressionType compression = CompressionType.valueOf(protobufResponse.compression());
+ byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression,
+ protobufResponse.uncompressedSize());
+ var result = ProtobufSerialization.deserializeToSearchResult(payload, query, searcher);
result.hits().unorderedIterator().forEachRemaining(hit -> {
if(hit instanceof FastHit) {
FastHit fhit = (FastHit) hit;
@@ -106,7 +109,7 @@ public class RpcSearchInvoker extends SearchInvoker {
// nothing to release
}
- public void receive(Client.SearchResponseOrError response) {
+ public void receive(Client.ResponseOrError<ProtobufResponse> response) {
responses.add(response);
responseAvailable();
}