diff options
8 files changed, 0 insertions, 582 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index ca40683a887..0a7357f4a86 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -84,9 +84,6 @@ interface Client { } interface NodeConnection { - void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime, - RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds); - void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index bb0bbf4b529..918d9566913 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -60,18 +60,6 @@ class RpcClient implements Client { } @Override - public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request("proton.getDocsums"); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedSlime)); - - request.setContext(hits); - invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(this, responseReceiver)); - } - - @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { Request request = new Request(rpcMethod); @@ -104,44 +92,6 @@ class RpcClient implements Client { } - private static class RpcDocsumResponseWaiter implements RequestWaiter { - - /** The node to which we made the request we are waiting for - for error messages only */ - private final RpcNodeConnection node; - - /** The handler to which the response is forwarded */ - private final RpcFillInvoker.GetDocsumsResponseReceiver handler; - - public RpcDocsumResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { - this.node = node; - this.handler = handler; - } - - @Override - public void handleRequestDone(Request requestWithResponse) { - if (requestWithResponse.isError()) { - handler.receive(ResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage())); - return; - } - - Values returnValues = requestWithResponse.returnValues(); - if (returnValues.size() < 3) { - handler.receive(ResponseOrError.fromError( - "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size())); - return; - } - - byte compression = returnValues.get(0).asInt8(); - int uncompressedSize = returnValues.get(1).asInt32(); - byte[] compressedSlimeBytes = returnValues.get(2).asData(); - @SuppressWarnings("unchecked") // TODO: Non-protobuf rpc docsums to be removed soon - List<FastHit> hits = (List<FastHit>) requestWithResponse.getContext(); - handler.receive( - ResponseOrError.fromResponse(new GetDocsumsResponse(compression, uncompressedSize, compressedSlimeBytes, hits))); - } - - } - private static class RpcProtobufResponseWaiter implements RequestWaiter { /** The node to which we made the request we are waiting for - for error messages only */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java deleted file mode 100644 index ad5d129ef6d..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java +++ /dev/null @@ -1,260 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.collections.ListMap; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.container.protect.Error; -import com.yahoo.data.access.Inspector; -import com.yahoo.data.access.slime.SlimeAdapter; -import com.yahoo.prelude.Location; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.TimeoutException; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.FillInvoker; -import com.yahoo.search.dispatch.rpc.Client.GetDocsumsResponse; -import com.yahoo.search.query.SessionId; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Hit; -import com.yahoo.slime.ArrayTraverser; -import com.yahoo.slime.BinaryFormat; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Slime; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * {@link FillInvoker} implementation using RPC - * - * @author bratseth - * @author ollivir - */ -public class RpcFillInvoker extends FillInvoker { - private static final Logger log = Logger.getLogger(RpcFillInvoker.class.getName()); - - private final DocumentDatabase documentDb; - private final RpcResourcePool resourcePool; - private GetDocsumsResponseReceiver responseReceiver; - - RpcFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb) { - this.documentDb = documentDb; - this.resourcePool = resourcePool; - } - - @Override - protected void sendFillRequest(Result result, String summaryClass) { - ListMap<Integer, FastHit> hitsByNode = hitsByNode(result); - Query query = result.getQuery(); - - CompressionType compression = CompressionType - .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - - if (query.getTraceLevel() >= 3) { - query.trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3); - query.trace("RpcSlime: Not resending query during document summary fetching", 3); - } - - responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), resourcePool.compressor(), result); - for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) { - sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver); - } - } - - @Override - protected void getFillResults(Result result, String summaryClass) { - try { - responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb); - result.hits().setSorted(false); - result.analyzeHits(); - } catch (TimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage())); - } - } - - @Override - protected void release() { - // nothing to release - } - - /** Return a map of hits by their search node (partition) id */ - private static ListMap<Integer, FastHit> hitsByNode(Result result) { - ListMap<Integer, FastHit> hitsByNode = new ListMap<>(); - for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) { - Hit h = i.next(); - if (!(h instanceof FastHit)) - continue; - FastHit hit = (FastHit) h; - - hitsByNode.put(hit.getDistributionKey(), hit); - } - return hitsByNode; - } - - /** Send a getDocsums request to a node. Responses will be added to the given receiver. */ - private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression, - Result result, GetDocsumsResponseReceiver responseReceiver) { - Client.NodeConnection node = resourcePool.getConnection(nodeId); - if (node == null) { - String error = "Could not fill hits from unknown node " + nodeId; - responseReceiver.receive(Client.ResponseOrError.fromError(error)); - result.hits().addError(ErrorMessage.createEmptyDocsums(error)); - log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config"); - return; - } - - Query query = result.getQuery(); - String rankProfile = query.getRanking().getProfile(); - byte[] serializedSlime = BinaryFormat - .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), - query.getSessionId(), query.getRanking().getLocation(), hits)); - double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compress(query, serializedSlime); - node.getDocsums(hits, compressionResult.type(), serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds); - } - - static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, Location location, List<FastHit> hits) { - Slime slime = new Slime(); - Cursor root = slime.setObject(); - if (summaryClass != null) { - root.setString("class", summaryClass); - } - if (sessionId != null) { - root.setData("sessionid", sessionId.asUtf8String().getBytes()); - } - if (docType != null) { - root.setString("doctype", docType); - } - if (rankProfile != null) { - root.setString("ranking", rankProfile); - } - if (location != null) { - root.setString("location", location.backendString()); - } - Cursor gids = root.setArray("gids"); - for (FastHit hit : hits) { - gids.addData(hit.getRawGlobalId()); - } - return slime; - } - - /** Receiver of the responses to a set of getDocsums requests */ - public static class GetDocsumsResponseReceiver { - - private final BlockingQueue<Client.ResponseOrError<GetDocsumsResponse>> responses; - private final Compressor compressor; - private final Result result; - - /** Whether we have already logged/notified about an error - to avoid spamming */ - private boolean hasReportedError = false; - - /** The number of responses we should receive (and process) before this is complete */ - private int outstandingResponses; - - GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) { - this.compressor = compressor; - responses = new LinkedBlockingQueue<>(requestCount); - outstandingResponses = requestCount; - this.result = result; - } - - /** Called by a thread belonging to the client when a valid response becomes available */ - public void receive(Client.ResponseOrError<GetDocsumsResponse> response) { - responses.add(response); - } - - private void throwTimeout() throws TimeoutException { - throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding."); - } - - /** - * Call this from the dispatcher thread to initiate and complete processing of responses. - * This will block until all responses are available and processed, or to timeout. - */ - void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException { - try { - int skippedHits = 0; - while (outstandingResponses > 0) { - long timeLeftMs = query.getTimeLeft(); - if (timeLeftMs <= 0) { - throwTimeout(); - } - Client.ResponseOrError<GetDocsumsResponse> response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); - if (response == null) - throwTimeout(); - skippedHits += processResponse(response, summaryClass, documentDb); - outstandingResponses--; - } - if (skippedHits != 0) { - result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + - summaryClass + " for " + skippedHits + " hits")); - } - } - catch (InterruptedException e) { - // TODO: Add error - } - } - - private int processResponse(Client.ResponseOrError<GetDocsumsResponse> responseOrError, - String summaryClass, - DocumentDatabase documentDb) { - if (responseOrError.error().isPresent()) { - if (hasReportedError) return 0; - String error = responseOrError.error().get(); - result.hits().addError(ErrorMessage.createBackendCommunicationError(error)); - log.log(Level.WARNING, "Error fetching summary data: "+ error); - } - else { - Client.GetDocsumsResponse response = responseOrError.response().get(); - CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize()); - return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes); - } - return 0; - } - - private void addErrors(com.yahoo.slime.Inspector errors) { - errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> { - int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString())) - ? Error.TIMEOUT.code - : Error.UNSPECIFIED.code; - result.hits().addError(new ErrorMessage(errorCode, - value.field("message").asString(), value.field("details").asString())); - }); - } - - private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) { - com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get(); - com.yahoo.slime.Inspector errors = root.field("errors"); - boolean hasErrors = errors.valid() && (errors.entries() > 0); - if (hasErrors) { - addErrors(errors); - } - - Inspector summaries = new SlimeAdapter(root.field("docsums")); - if ( ! summaries.valid()) - return 0; // No summaries; Perhaps we requested a non-existing summary class - int skippedHits = 0; - for (int i = 0; i < hits.size(); i++) { - Inspector summary = summaries.entry(i).field("docsum"); - if (summary.valid()) { - hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName()); - hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary); - hits.get(i).setFilled(summaryClass); - } else { - skippedHits++; - } - } - return skippedHits; - } - - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index ba68847e0ab..25514ae4a23 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -43,13 +43,7 @@ public class RpcInvokerFactory extends InvokerFactory { Query query = result.getQuery(); boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query); - return new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery); } - // for testing - public FillInvoker createFillInvoker(DocumentDatabase documentDb) { - return new RpcFillInvoker(rpcResourcePool, documentDb); - } - } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java deleted file mode 100644 index 288167022d8..00000000000 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.prelude.fastsearch.DocsumDefinition; -import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; -import com.yahoo.prelude.fastsearch.DocsumField; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.search.Query; -import com.yahoo.search.Result; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -/** - * Tests using a dispatcher to fill a result - * - * @author bratseth - */ -public class FillTestCase { - - private MockClient client = new MockClient(); - - @Test - public void testFilling() { - Map<Integer, Client.NodeConnection> nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - nodes.put(1, client.createConnection("host1", 123)); - nodes.put(2, client.createConnection("host2", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - result.hits().add(createHit(2, 1)); - result.hits().add(createHit(1, 2)); - result.hits().add(createHit(2, 3)); - result.hits().add(createHit(0, 4)); - - client.setDocsumReponse("host0", 0, "summaryClass1", map("field1", "s.0.0", "field2", 0)); - client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); - client.setDocsumReponse("host1", 2, "summaryClass1", map("field1", "s.1.2", "field2", 2)); - client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); - client.setDocsumReponse("host0", 4, "summaryClass1", map("field1", "s.0.4", "field2", 4)); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); - assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); - assertEquals("s.1.2", result.hits().get("hit:2").getField("field1").toString()); - assertEquals("s.2.3", result.hits().get("hit:3").getField("field1").toString()); - assertEquals("s.0.4", result.hits().get("hit:4").getField("field1").toString()); - assertEquals(0L, result.hits().get("hit:0").getField("field2")); - assertEquals(1L, result.hits().get("hit:1").getField("field2")); - assertEquals(2L, result.hits().get("hit:2").getField("field2")); - assertEquals(3L, result.hits().get("hit:3").getField("field2")); - assertEquals(4L, result.hits().get("hit:4").getField("field2")); - } - - @Test - public void testEmptyHits() { - Map<Integer, Client.NodeConnection> nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - nodes.put(1, client.createConnection("host1", 123)); - nodes.put(2, client.createConnection("host2", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - result.hits().add(createHit(2, 1)); - result.hits().add(createHit(1, 2)); - result.hits().add(createHit(2, 3)); - result.hits().add(createHit(0, 4)); - - client.setDocsumReponse("host0", 0, "summaryClass1", map("field1", "s.0.0", "field2", 0)); - client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); - client.setDocsumReponse("host1", 2, "summaryClass1", new HashMap<>()); - client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); - client.setDocsumReponse("host0", 4, "summaryClass1", new HashMap<>()); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); - assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); - assertNull(result.hits().get("hit:2").getField("field1")); - assertEquals("s.2.3", result.hits().get("hit:3").getField("field1").toString()); - assertNull(result.hits().get("hit:4").getField("field1")); - - assertEquals(0L, result.hits().get("hit:0").getField("field2")); - assertEquals(1L, result.hits().get("hit:1").getField("field2")); - assertNull(result.hits().get("hit:2").getField("field2")); - assertEquals(3L, result.hits().get("hit:3").getField("field2")); - assertNull(result.hits().get("hit:4").getField("field2")); - - assertNull(result.hits().getError()); - } - - @Test - public void testMissingHits() { - Map<Integer, Client.NodeConnection> nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - nodes.put(1, client.createConnection("host1", 123)); - nodes.put(2, client.createConnection("host2", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - result.hits().add(createHit(2, 1)); - result.hits().add(createHit(1, 2)); - result.hits().add(createHit(2, 3)); - result.hits().add(createHit(0, 4)); - - client.setDocsumReponse("host0", 0, "summaryClass1", map("field1", "s.0.0", "field2", 0)); - client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); - client.setDocsumReponse("host1", 2, "summaryClass1", null); - client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); - client.setDocsumReponse("host0", 4, "summaryClass1", null); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); - assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); - assertNull(result.hits().get("hit:2").getField("field1")); - assertEquals("s.2.3", result.hits().get("hit:3").getField("field1").toString()); - assertNull(result.hits().get("hit:4").getField("field1")); - - assertEquals(0L, result.hits().get("hit:0").getField("field2")); - assertEquals(1L, result.hits().get("hit:1").getField("field2")); - assertNull(result.hits().get("hit:2").getField("field2")); - assertEquals(3L, result.hits().get("hit:3").getField("field2")); - assertNull(result.hits().get("hit:4").getField("field2")); - - assertEquals("Missing hit summary data for summary summaryClass1 for 2 hits", result.hits().getError().getDetailedMessage()); - } - - @Test - public void testErrorHandling() { - client.setMalfunctioning(true); - - Map<Integer, Client.NodeConnection> nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("Malfunctioning", result.hits().getError().getDetailedMessage()); - } - - @Test - public void testSendingFill2UnknownNode() { - client.setMalfunctioning(true); - - Map<Integer, Client.NodeConnection> nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - result.hits().add(createHit(1, 1)); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("Could not fill hits from unknown node 1", result.hits().getError().getDetailedMessage()); - } - - private DocumentDatabase db() { - List<DocsumField> fields = new ArrayList<>(); - fields.add(DocsumField.create("field1", "string")); - fields.add(DocsumField.create("field2", "int64")); - DocsumDefinitionSet docsums = new DocsumDefinitionSet(Collections.singleton(new DocsumDefinition("summaryClass1", fields))); - return new DocumentDatabase("default", docsums, Collections.emptySet()); - } - - private FastHit createHit(int sourceNodeId, int hitId) { - FastHit hit = new FastHit("hit:" + hitId, 1.0); - hit.setPartId(sourceNodeId); - hit.setDistributionKey(sourceNodeId); - hit.setGlobalId(client.globalIdFrom(hitId).getRawId()); - return hit; - } - - private Map<String, Object> map(String stringKey, String stringValue, String intKey, int intValue) { - Map<String, Object> map = new HashMap<>(); - map.put(stringKey, stringValue); - map.put(intKey, intValue); - return map; - } - -} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java index 8ebdfcc1a12..61971e975e5 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java @@ -55,46 +55,6 @@ public class MockClient implements Client { } @Override - public void getDocsums(List<FastHit> hitsContext, CompressionType compression, int uncompressedSize, byte[] compressedSlime, - RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - if (malfunctioning) { - responseReceiver.receive(ResponseOrError.fromError("Malfunctioning")); - return; - } - - Inspector request = BinaryFormat.decode(compressor.decompress(compressedSlime, compression, uncompressedSize)).get(); - String docsumClass = request.field("class").asString(); - List<Map<String, Object>> docsumsToReturn = new ArrayList<>(); - request.field("gids").traverse((ArrayTraverser) (index, gid) -> { - GlobalId docId = new GlobalId(gid.asData()); - docsumsToReturn.add(docsums.get(new DocsumKey(toString(), docId, docsumClass))); - }); - Slime responseSlime = new Slime(); - Cursor root = responseSlime.setObject(); - Cursor docsums = root.setArray("docsums"); - for (Map<String, Object> docsumFields : docsumsToReturn) { - if (docsumFields == null) continue; - - Cursor docsumItem = docsums.addObject(); - Cursor docsum = docsumItem.setObject("docsum"); - for (Map.Entry<String, Object> field : docsumFields.entrySet()) { - if (field.getValue() instanceof Integer) - docsum.setLong(field.getKey(), (Integer) field.getValue()); - else if (field.getValue() instanceof String) - docsum.setString(field.getKey(), (String) field.getValue()); - else - throw new RuntimeException(); - } - } - byte[] slimeBytes = BinaryFormat.encode(responseSlime); - CompressionType responseCompressionType = compression == CompressionType.INCOMPRESSIBLE ? CompressionType.NONE : compression; - Compressor.Compression compressionResult = compressor.compress(responseCompressionType, slimeBytes); - GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length, - compressionResult.data(), hitsContext); - responseReceiver.receive(ResponseOrError.fromResponse(response)); - } - - @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { if (malfunctioning) { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockRpcResourcePoolBuilder.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockRpcResourcePoolBuilder.java index dbef9d819e8..23d6ae6bf2b 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockRpcResourcePoolBuilder.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockRpcResourcePoolBuilder.java @@ -5,7 +5,6 @@ import com.yahoo.compress.CompressionType; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; import com.yahoo.search.dispatch.rpc.Client.ResponseReceiver; -import com.yahoo.search.dispatch.rpc.RpcFillInvoker.GetDocsumsResponseReceiver; import java.util.HashMap; import java.util.List; @@ -35,12 +34,6 @@ public class MockRpcResourcePoolBuilder { } @Override - public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime, - GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - responseReceiver.receive(Client.ResponseOrError.fromError("getDocsums(..) attempted for node " + key)); - } - - @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { responseReceiver.receive(Client.ResponseOrError.fromError("request('"+rpcMethod+"', ..) attempted for node " + key)); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index 27fc3f85136..45ad361a214 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -9,7 +9,6 @@ import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.rpc.RpcFillInvoker.GetDocsumsResponseReceiver; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.searchchain.Execution; import org.junit.Test; @@ -85,12 +84,6 @@ public class RpcSearchInvokerTest { public NodeConnection createConnection(String hostname, int port) { return new NodeConnection() { @Override - public void getDocsums(List<FastHit> hits, CompressionType compression, int uncompressedLength, byte[] compressedSlime, - GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - fail("Unexpected call"); - } - - @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { compressionTypeHolder.set(compression); |