diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-03-15 18:14:17 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-15 18:14:17 +0100 |
commit | 7f56c30703f5319f5368653f6313a8f12f07b3cb (patch) | |
tree | 394c9717eee127c407e3d3222169bab4a420228e /container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java | |
parent | 156743b84071f04114bc4479a3bcc3ca10b2f7e5 (diff) |
Revert "Protobuf over jrt support in search"
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java | 118 |
1 files changed, 0 insertions, 118 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java deleted file mode 100644 index 88d77c760e3..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.fs4.QueryPacket; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.rpc.Client.SearchResponse; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.searchchain.Execution; - -import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * {@link SearchInvoker} implementation using RPC - * - * @author ollivir - */ -public class RpcSearchInvoker extends SearchInvoker { - private final VespaBackEndSearcher searcher; - private final Node node; - private final RpcResourcePool resourcePool; - private final BlockingQueue<Client.SearchResponseOrError> responses; - - private Query query; - - RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) { - super(Optional.of(node)); - this.searcher = searcher; - this.node = node; - this.resourcePool = resourcePool; - this.responses = new LinkedBlockingQueue<>(1); - } - - @Override - protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { - this.query = query; - - CompressionType compression = CompressionType - .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - - Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); - if (nodeConnection == null) { - responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key())); - responseAvailable(); - return; - } - - var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true); - double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, - timeoutSeconds); - } - - @Override - protected Result getSearchResult(Execution execution) throws IOException { - long timeLeftMs = query.getTimeLeft(); - if (timeLeftMs <= 0) { - return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } - Client.SearchResponseOrError response = null; - try { - response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // handled as timeout - } - if (response == null) { - return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } - if (response.error().isPresent()) { - return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get())); - } - if (response.response().isEmpty()) { - return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available")); - } - - SearchResponse searchResponse = response.response().get(); - CompressionType compression = CompressionType.valueOf(searchResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression, - searchResponse.uncompressedSize()); - var result = ProtobufSerialization.deserializeToResult(payload, query, searcher); - result.hits().unorderedIterator().forEachRemaining(hit -> { - if(hit instanceof FastHit) { - FastHit fhit = (FastHit) hit; - fhit.setPartId(node.pathIndex()); - fhit.setDistributionKey(node.key()); - } - hit.setSource(getName()); - }); - - return result; - } - - @Override - protected void release() { - // nothing to release - } - - public void receive(Client.SearchResponseOrError response) { - responses.add(response); - responseAvailable(); - } - - private String getName() { - return searcher.getName(); - } - -} |