diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2019-03-27 14:55:30 +0100 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2019-03-27 14:55:30 +0100 |
commit | 2d68d291bacdcea237cb3f6c3e5f85aa61845b88 (patch) | |
tree | 8f45ae2022e0a812e18513398e242a0a121166dd /container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java | |
parent | 5e33bb54604989bb2cef605572f7750d45eb630a (diff) |
Retrieve document summaries over jrt/protobuf
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index 88d77c760e3..f8a5ee2f8c1 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -9,7 +9,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.rpc.Client.SearchResponse; +import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; @@ -25,11 +25,13 @@ import java.util.concurrent.TimeUnit; * * @author ollivir */ -public class RpcSearchInvoker extends SearchInvoker { +public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseReceiver { + private final String RPC_METHOD = "vespa.searchprotocol.search"; + private final VespaBackEndSearcher searcher; private final Node node; private final RpcResourcePool resourcePool; - private final BlockingQueue<Client.SearchResponseOrError> responses; + private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses; private Query query; @@ -50,15 +52,16 @@ public class RpcSearchInvoker extends SearchInvoker { Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); if (nodeConnection == null) { - responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key())); + responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key())); responseAvailable(); return; } + query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key()); - var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true); + var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId()); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, + resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds); } @@ -68,7 +71,7 @@ public class RpcSearchInvoker extends SearchInvoker { if (timeLeftMs <= 0) { return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); } - Client.SearchResponseOrError response = null; + Client.ResponseOrError<ProtobufResponse> response = null; try { response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -84,11 +87,11 @@ public class RpcSearchInvoker extends SearchInvoker { return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available")); } - SearchResponse searchResponse = response.response().get(); - CompressionType compression = CompressionType.valueOf(searchResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression, - searchResponse.uncompressedSize()); - var result = ProtobufSerialization.deserializeToResult(payload, query, searcher); + ProtobufResponse protobufResponse = response.response().get(); + CompressionType compression = CompressionType.valueOf(protobufResponse.compression()); + byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, + protobufResponse.uncompressedSize()); + var result = ProtobufSerialization.deserializeToSearchResult(payload, query, searcher); result.hits().unorderedIterator().forEachRemaining(hit -> { if(hit instanceof FastHit) { FastHit fhit = (FastHit) hit; @@ -106,7 +109,7 @@ public class RpcSearchInvoker extends SearchInvoker { // nothing to release } - public void receive(Client.SearchResponseOrError response) { + public void receive(Client.ResponseOrError<ProtobufResponse> response) { responses.add(response); responseAvailable(); } |