aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java16
1 files changed, 7 insertions, 9 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index c84795352f5..4e538fb54dc 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -5,7 +5,6 @@ import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.collections.ListMap;
import com.yahoo.collections.Pair;
-import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.Inspector;
@@ -44,9 +43,10 @@ public class RpcProtobufFillInvoker extends FillInvoker {
private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
private final DocumentDatabase documentDb;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool resourcePool;
private final boolean summaryNeedsQuery;
private final String serverId;
+ private final CompressPayload compressor;
private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses;
@@ -56,11 +56,12 @@ public class RpcProtobufFillInvoker extends FillInvoker {
/** The number of responses we should receive (and process) before this is complete */
private int outstandingResponses;
- RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
+ RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
this.documentDb = documentDb;
this.resourcePool = resourcePool;
this.serverId = serverId;
this.summaryNeedsQuery = summaryNeedsQuery;
+ this.compressor = compressor;
}
@Override
@@ -121,9 +122,8 @@ public class RpcProtobufFillInvoker extends FillInvoker {
ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
Hit h = i.next();
- if (!(h instanceof FastHit))
+ if (!(h instanceof FastHit hit))
continue;
- FastHit hit = (FastHit) h;
hitsByNode.put(hit.getDistributionKey(), hit);
}
@@ -143,7 +143,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
Query query = result.getQuery();
- Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ Compressor.Compression compressionResult = compressor.compress(query, payload);
node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(),
roe -> receive(roe, hits), clientTimeout);
}
@@ -189,9 +189,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
hasReportedError = true;
} else {
Client.ProtobufResponse response = responseOrError.response().get();
- CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression,
- response.uncompressedSize());
+ byte[] responseBytes = compressor.decompress(response);
return fill(result, hitsContext, summaryClass, responseBytes);
}
return 0;