summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java184
1 files changed, 184 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java
new file mode 100644
index 00000000000..59bc781c8b2
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java
@@ -0,0 +1,184 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.GetDocSumsPacket;
+import com.yahoo.fs4.Packet;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator;
+
+/**
+ * {@link FillInvoker} implementation for FS4 nodes and fdispatch
+ *
+ * @author ollivir
+ */
+public class FS4FillInvoker extends FillInvoker {
+ private final VespaBackEndSearcher searcher;
+ private FS4Channel channel;
+
+ private int expectedFillResults = 0;
+
+ // fdispatch code path
+ FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) {
+ this.searcher = searcher;
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ @Override
+ protected void sendFillRequest(Result result, String summaryClass) {
+
+ if (countUnfilledFastHits(result, summaryClass) > 0) {
+ try {
+ expectedFillResults = requestSummaries(result, summaryClass);
+ } catch (InvalidChannelException e) {
+ result.hits()
+ .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage()));
+ }
+ } else {
+ expectedFillResults = 0;
+ }
+ }
+
+
+ @Override
+ protected void getFillResults(Result result, String summaryClass) {
+ if (expectedFillResults == 0) {
+ return;
+ }
+
+ Packet[] receivedPackets;
+ try {
+ receivedPackets = getSummaryResponses(result);
+ } catch (InvalidChannelException e1) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
+ return;
+ } catch (ChannelTimeoutException e1) {
+ result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName()));
+ return;
+ }
+
+ if (receivedPackets.length == 0) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)"));
+ return;
+ }
+
+ int skippedHits;
+ try {
+ FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass);
+ skippedHits = fillHitsResult.skippedHits;
+ if (fillHitsResult.error != null) {
+ result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error));
+ return;
+ }
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout(e.getMessage()));
+ return;
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage()));
+ return;
+ }
+
+ if (skippedHits > 0)
+ result.hits().addError(
+ ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits"));
+ result.analyzeHits();
+
+ if (channel.getQuery().getTraceLevel() >= 3) {
+ int hitNumber = 0;
+ for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
+ com.yahoo.search.result.Hit hit = i.next();
+ if (!(hit instanceof FastHit))
+ continue;
+ FastHit fastHit = (FastHit) hit;
+
+ String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend");
+ if (!fastHit.isFilled(summaryClass))
+ traceMsg += ". Error, hit, not filled";
+ channel.getQuery().trace(traceMsg, false, 3);
+ }
+ }
+ }
+
+ @Override
+ public void release() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+
+ private int countUnfilledFastHits(Result result, String summaryClass) {
+ int count = 0;
+ for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
+ Hit hit = i.next();
+ if (hit instanceof FastHit && !hit.isFilled(summaryClass))
+ count++;
+ }
+ return count;
+ }
+
+ private int requestSummaries(Result result, String summaryClass) throws InvalidChannelException, ClassCastException, IOException {
+
+ boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery());
+ if (result.getQuery().getTraceLevel() >= 3)
+ result.getQuery().trace((summaryNeedsQuery ? "FS4: Resending " : "Not resending ") + "query during document summary fetching", 3);
+
+ GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery);
+ int compressionLimit = result.getQuery().properties().getInteger(FS4SearchInvoker.PACKET_COMPRESSION_LIMIT, 0);
+ docsumsPacket.setCompressionLimit(compressionLimit);
+ if (compressionLimit != 0) {
+ docsumsPacket.setCompressionType(result.getQuery().properties().getString(FS4SearchInvoker.PACKET_COMPRESSION_TYPE, "lz4"));
+ }
+
+ boolean couldSend = channel.sendPacket(docsumsPacket);
+ if (!couldSend)
+ throw new IOException("Could not successfully send GetDocSumsPacket.");
+
+ return docsumsPacket.getNumDocsums() + 1;
+ }
+
+ private Packet[] getSummaryResponses(Result result) throws InvalidChannelException, ChannelTimeoutException {
+ if(expectedFillResults == 0) {
+ return new Packet[0];
+ }
+ BasicPacket[] receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), expectedFillResults);
+
+ return convertBasicPackets(receivedPackets);
+ }
+
+ private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException {
+ // trying to cast a BasicPacket[] to Packet[] will compile,
+ // but lead to a runtime error. At least that's what I got
+ // from testing and reading the specification. I'm just happy
+ // if someone tells me what's the proper Java way of doing
+ // this. -SK
+ Packet[] packets = new Packet[basicPackets.length];
+
+ for (int i = 0; i < basicPackets.length; i++) {
+ packets[i] = (Packet) basicPackets[i];
+ }
+ return packets;
+ }
+
+ private String getName() {
+ return searcher.getName();
+ }
+}