diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java index c84795352f5..4e538fb54dc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -5,7 +5,6 @@ import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol; import com.google.protobuf.InvalidProtocolBufferException; import com.yahoo.collections.ListMap; import com.yahoo.collections.Pair; -import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.container.protect.Error; import com.yahoo.data.access.Inspector; @@ -44,9 +43,10 @@ public class RpcProtobufFillInvoker extends FillInvoker { private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName()); private final DocumentDatabase documentDb; - private final RpcResourcePool resourcePool; + private final RpcConnectionPool resourcePool; private final boolean summaryNeedsQuery; private final String serverId; + private final CompressPayload compressor; private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses; @@ -56,11 +56,12 @@ public class RpcProtobufFillInvoker extends FillInvoker { /** The number of responses we should receive (and process) before this is complete */ private int outstandingResponses; - RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) { + RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) { this.documentDb = documentDb; this.resourcePool = resourcePool; this.serverId = serverId; this.summaryNeedsQuery = summaryNeedsQuery; + this.compressor = compressor; } @Override @@ -121,9 +122,8 @@ public class RpcProtobufFillInvoker extends FillInvoker { ListMap<Integer, FastHit> hitsByNode = new ListMap<>(); for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) { Hit h = i.next(); - if (!(h instanceof FastHit)) + if (!(h instanceof FastHit hit)) continue; - FastHit hit = (FastHit) h; hitsByNode.put(hit.getDistributionKey(), hit); } @@ -143,7 +143,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { } Query query = result.getQuery(); - Compressor.Compression compressionResult = resourcePool.compress(query, payload); + Compressor.Compression compressionResult = compressor.compress(query, payload); node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits), clientTimeout); } @@ -189,9 +189,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { hasReportedError = true; } else { Client.ProtobufResponse response = responseOrError.response().get(); - CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, - response.uncompressedSize()); + byte[] responseBytes = compressor.decompress(response); return fill(result, hitsContext, summaryClass, responseBytes); } return 0; |