path: root/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java
diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java')
1 files changed, 253 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java
new file mode 100644
index 00000000000..578c447dfbe
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java
@@ -0,0 +1,253 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+import com.yahoo.collections.ListMap;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.container.protect.Error;
+import com.yahoo.data.access.Inspector;
+import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.prelude.fastsearch.DocumentDatabase;
+import com.yahoo.prelude.fastsearch.FastHit;
+import com.yahoo.prelude.fastsearch.TimeoutException;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.query.SessionId;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.slime.ArrayTraverser;
+import com.yahoo.slime.BinaryFormat;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Slime;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+ * {@link FillInvoker} implementation using RPC
+ *
+ * @author bratseth
+ * @author ollivir
+ */
+public class RpcFillInvoker extends FillInvoker {
+ private static final Logger log = Logger.getLogger(RpcFillInvoker.class.getName());
+ private final DocumentDatabase documentDb;
+ private final RpcResourcePool resourcePool;
+ private GetDocsumsResponseReceiver responseReceiver;
+ RpcFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb) {
+ this.documentDb = documentDb;
+ this.resourcePool = resourcePool;
+ }
+ @Override
+ protected void sendFillRequest(Result result, String summaryClass) {
+ ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
+ CompressionType compression = CompressionType
+ .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
+ if (result.getQuery().getTraceLevel() >= 3)
+ result.getQuery().trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3);
+ responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), resourcePool.compressor(), result);
+ for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
+ sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver);
+ }
+ }
+ @Override
+ protected void getFillResults(Result result, String summaryClass) {
+ try {
+ responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb);
+ result.hits().setSorted(false);
+ result.analyzeHits();
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
+ }
+ }
+ @Override
+ protected void release() {
+ // nothing to release
+ }
+ /** Return a map of hits by their search node (partition) id */
+ private static ListMap<Integer, FastHit> hitsByNode(Result result) {
+ ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
+ for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
+ Hit h = i.next();
+ if (!(h instanceof FastHit))
+ continue;
+ FastHit hit = (FastHit) h;
+ hitsByNode.put(hit.getDistributionKey(), hit);
+ }
+ return hitsByNode;
+ }
+ /** Send a getDocsums request to a node. Responses will be added to the given receiver. */
+ private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression,
+ Result result, GetDocsumsResponseReceiver responseReceiver) {
+ Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
+ if (node == null) {
+ String error = "Could not fill hits from unknown node " + nodeId;
+ responseReceiver.receive(Client.GetDocsumsResponseOrError.fromError(error));
+ result.hits().addError(ErrorMessage.createEmptyDocsums(error));
+ log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
+ return;
+ }
+ Query query = result.getQuery();
+ String rankProfile = query.getRanking().getProfile();
+ byte[] serializedSlime = BinaryFormat
+ .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), hits));
+ double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
+ Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime);
+ resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(),
+ responseReceiver, timeoutSeconds);
+ }
+ static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ if (summaryClass != null) {
+ root.setString("class", summaryClass);
+ }
+ if (sessionId != null) {
+ root.setData("sessionid", sessionId.asUtf8String().getBytes());
+ }
+ if (docType != null) {
+ root.setString("doctype", docType);
+ }
+ if (rankProfile != null) {
+ root.setString("ranking", rankProfile);
+ }
+ Cursor gids = root.setArray("gids");
+ for (FastHit hit : hits) {
+ gids.addData(hit.getGlobalId().getRawId());
+ }
+ return slime;
+ }
+ /** Receiver of the responses to a set of getDocsums requests */
+ public static class GetDocsumsResponseReceiver {
+ private final BlockingQueue<Client.GetDocsumsResponseOrError> responses;
+ private final Compressor compressor;
+ private final Result result;
+ /** Whether we have already logged/notified about an error - to avoid spamming */
+ private boolean hasReportedError = false;
+ /** The number of responses we should receive (and process) before this is complete */
+ private int outstandingResponses;
+ GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) {
+ this.compressor = compressor;
+ responses = new LinkedBlockingQueue<>(requestCount);
+ outstandingResponses = requestCount;
+ this.result = result;
+ }
+ /** Called by a thread belonging to the client when a valid response becomes available */
+ public void receive(Client.GetDocsumsResponseOrError response) {
+ responses.add(response);
+ }
+ private void throwTimeout() throws TimeoutException {
+ throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding.");
+ }
+ /**
+ * Call this from the dispatcher thread to initiate and complete processing of responses.
+ * This will block until all responses are available and processed, or to timeout.
+ */
+ void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException {
+ try {
+ int skippedHits = 0;
+ while (outstandingResponses > 0) {
+ long timeLeftMs = query.getTimeLeft();
+ if (timeLeftMs <= 0) {
+ throwTimeout();
+ }
+ Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
+ if (response == null)
+ throwTimeout();
+ skippedHits += processResponse(response, summaryClass, documentDb);
+ outstandingResponses--;
+ }
+ if (skippedHits != 0) {
+ result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " +
+ summaryClass + " for " + skippedHits + " hits"));
+ }
+ }
+ catch (InterruptedException e) {
+ // TODO: Add error
+ }
+ }
+ private int processResponse(Client.GetDocsumsResponseOrError responseOrError,
+ String summaryClass,
+ DocumentDatabase documentDb) {
+ if (responseOrError.error().isPresent()) {
+ if (hasReportedError) return 0;
+ String error = responseOrError.error().get();
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
+ log.log(Level.WARNING, "Error fetching summary data: "+ error);
+ }
+ else {
+ Client.GetDocsumsResponse response = responseOrError.response().get();
+ CompressionType compression = CompressionType.valueOf(response.compression());
+ byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize());
+ return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes);
+ }
+ return 0;
+ }
+ private void addErrors(com.yahoo.slime.Inspector errors) {
+ errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> {
+ int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString()))
+ ? Error.TIMEOUT.code
+ : Error.UNSPECIFIED.code;
+ result.hits().addError(new ErrorMessage(errorCode,
+ value.field("message").asString(), value.field("details").asString()));
+ });
+ }
+ private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) {
+ com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get();
+ com.yahoo.slime.Inspector errors = root.field("errors");
+ boolean hasErrors = errors.valid() && (errors.entries() > 0);
+ if (hasErrors) {
+ addErrors(errors);
+ }
+ Inspector summaries = new SlimeAdapter(root.field("docsums"));
+ if ( ! summaries.valid())
+ return 0; // No summaries; Perhaps we requested a non-existing summary class
+ int skippedHits = 0;
+ for (int i = 0; i < hits.size(); i++) {
+ Inspector summary = summaries.entry(i).field("docsum");
+ if (summary.fieldCount() != 0) {
+ hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName());
+ hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary);
+ hits.get(i).setFilled(summaryClass);
+ } else {
+ skippedHits++;
+ }
+ }
+ return skippedHits;
+ }
+ }