summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java351
1 files changed, 351 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
new file mode 100644
index 00000000000..dc95f83365e
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
@@ -0,0 +1,351 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.DocsumPacket;
+import com.yahoo.fs4.GetDocSumsPacket;
+import com.yahoo.fs4.Packet;
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.fs4.QueryResultPacket;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.CloseableChannel;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.search.result.HitGroup;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator;
+
+/**
+ * {@link CloseableChannel} implementation for FS4 nodes and fdispatch
+ *
+ * @author ollivir
+ */
+public class FS4CloseableChannel extends CloseableChannel {
+ private final VespaBackEndSearcher searcher;
+ private FS4Channel channel;
+ private final Optional<Integer> distributionKey;
+
+ public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port,
+ int distributionKey) {
+ this.searcher = searcher;
+ this.distributionKey = Optional.of(distributionKey);
+
+ Backend backend = fs4ResourcePool.getBackend(hostname, port);
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ // fdispatch code path
+ public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, Backend backend) {
+ this.searcher = searcher;
+ this.distributionKey = Optional.empty();
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ @Override
+ public Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
+ if (isLoggingFine())
+ getLogger().finest("sending query packet");
+
+ try {
+ boolean couldSend = channel.sendPacket(queryPacket);
+ if (!couldSend)
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'"));
+ } catch (InvalidChannelException e) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()));
+ } catch (IllegalStateException e) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()));
+ }
+
+ BasicPacket[] basicPackets;
+
+ try {
+ basicPackets = channel.receivePackets(query.getTimeLeft(), 1);
+ } catch (ChannelTimeoutException e) {
+ return new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
+ } catch (InvalidChannelException e) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()));
+ }
+
+ if (basicPackets.length == 0) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"));
+ }
+
+ if (isLoggingFine())
+ getLogger().finest("got packets " + basicPackets.length + " packets");
+
+ basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName());
+ QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0];
+
+ if (isLoggingFine())
+ getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary());
+
+ if (query.getPresentation().getSummary() == null)
+ query.getPresentation().setSummary(searcher.getDefaultDocsumClass());
+
+ Result result = new Result(query);
+
+ searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result);
+
+ searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey);
+ Packet[] packets;
+ CacheControl cacheControl = searcher.getCacheControl();
+ PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query);
+
+ if (packetWrapper != null) {
+ cacheControl.updateCacheEntry(cacheKey, query, resultPacket);
+ } else {
+ if (resultPacket.getCoverageFeature() && !resultPacket.getCoverageFull()) {
+ // Don't add error here, it was done in first phase
+ // No check if packetWrapper already exists, since incomplete
+ // first phase data won't be cached anyway.
+ } else {
+ packets = new Packet[1];
+ packets[0] = resultPacket;
+ cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void partialFill(Result result, String summaryClass) {
+ Packet[] receivedPackets;
+ DocsumPacketKey[] packetKeys;
+
+ CacheKey cacheKey = null;
+ PacketWrapper packetWrapper = null;
+ if (searcher.getCacheControl().useCache(channel.getQuery())) {
+ cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass);
+ if (cacheKey == null) {
+ QueryPacket queryPacket = QueryPacket.create(channel.getQuery());
+ cacheKey = new CacheKey(queryPacket);
+ }
+ packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass);
+ }
+
+ if (countFastHits(result) > 0) {
+ packetKeys = getPacketKeys(result, summaryClass, false);
+ if (packetKeys.length == 0) {
+ receivedPackets = new Packet[0];
+ } else {
+ try {
+ receivedPackets = fetchSummaries(result, summaryClass);
+ } catch (InvalidChannelException e) {
+ result.hits()
+ .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
+ return;
+ } catch (ChannelTimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName()));
+ return;
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage()));
+ return;
+ }
+ if (receivedPackets.length == 0) {
+ result.hits()
+ .addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)"));
+ return;
+ }
+ }
+ } else {
+ packetKeys = new DocsumPacketKey[0];
+ receivedPackets = new Packet[0];
+ }
+
+ int skippedHits;
+ try {
+ FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass);
+ skippedHits = fillHitsResult.skippedHits;
+ if (fillHitsResult.error != null) {
+ result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error));
+ return;
+ }
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout(e.getMessage()));
+ return;
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage()));
+ return;
+ }
+ if (skippedHits == 0 && packetWrapper != null) {
+ searcher.getCacheControl().updateCacheEntry(cacheKey, channel.getQuery(), packetKeys, receivedPackets);
+ }
+
+ if (skippedHits > 0)
+ result.hits().addError(
+ ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits"));
+ result.analyzeHits();
+
+ if (channel.getQuery().getTraceLevel() >= 3) {
+ int hitNumber = 0;
+ for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
+ com.yahoo.search.result.Hit hit = i.next();
+ if (!(hit instanceof FastHit))
+ continue;
+ FastHit fastHit = (FastHit) hit;
+
+ String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend");
+ if (!fastHit.isFilled(summaryClass))
+ traceMsg += ". Error, hit, not filled";
+ channel.getQuery().trace(traceMsg, false, 3);
+ }
+ }
+ }
+
+ @Override
+ public void closeChannel() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+
+ private PacketWrapper cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) {
+ Query query = result.getQuery();
+ PacketWrapper packetWrapper = searcher.getCacheControl().lookup(cacheKey, query);
+
+ if (packetWrapper == null) {
+ return null;
+ }
+ if (packetWrapper.getNumPackets() != 0) {
+ for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
+ Hit hit = i.next();
+
+ if (hit instanceof FastHit) {
+ FastHit fastHit = (FastHit) hit;
+ DocsumPacketKey key = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass);
+
+ if (searcher.fillHit(fastHit, (DocsumPacket) packetWrapper.getPacket(key), summaryClass).ok) {
+ fastHit.setCached(true);
+ }
+
+ }
+ }
+ result.hits().setSorted(false);
+ result.analyzeHits();
+ }
+
+ return packetWrapper;
+ }
+
+ private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) {
+ for (Iterator<Hit> i = hits.unorderedDeepIterator(); i.hasNext();) {
+ Hit h = i.next();
+ if (h instanceof FastHit) {
+ FastHit hit = (FastHit) h;
+ if (hit.isFilled(summaryClass)) {
+ continue;
+ }
+ if (hit.getCacheKey() != null) {
+ return hit.getCacheKey();
+ }
+ }
+ }
+ return null;
+ }
+
+ private int countFastHits(Result result) {
+ int count = 0;
+ for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
+ if (i.next() instanceof FastHit)
+ count++;
+ }
+ return count;
+ }
+
+ private Packet[] fetchSummaries(Result result, String summaryClass)
+ throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException {
+
+ BasicPacket[] receivedPackets;
+ boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery());
+ if (result.getQuery().getTraceLevel() >= 3)
+ result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3);
+
+ GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery);
+ int compressionLimit = result.getQuery().properties().getInteger(FastSearcher.PACKET_COMPRESSION_LIMIT, 0);
+ docsumsPacket.setCompressionLimit(compressionLimit);
+ if (compressionLimit != 0) {
+ docsumsPacket.setCompressionType(result.getQuery().properties().getString(FastSearcher.PACKET_COMPRESSION_TYPE, "lz4"));
+ }
+
+ boolean couldSend = channel.sendPacket(docsumsPacket);
+ if (!couldSend)
+ throw new IOException("Could not successfully send GetDocSumsPacket.");
+ receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1);
+
+ return convertBasicPackets(receivedPackets);
+ }
+
+ /**
+ * Returns an array of the hits contained in a result
+ *
+ * @param filled
+ * true to return all hits, false to return only unfilled hits
+ * @return array of docids, empty array if no hits
+ */
+ private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) {
+ DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()];
+ int x = 0;
+
+ for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
+ com.yahoo.search.result.Hit hit = i.next();
+ if (hit instanceof FastHit) {
+ FastHit fastHit = (FastHit) hit;
+ if (filled || !fastHit.isFilled(summaryClass)) {
+ packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass);
+ x++;
+ }
+ }
+ }
+ if (x < packetKeys.length) {
+ DocsumPacketKey[] tmp = new DocsumPacketKey[x];
+
+ System.arraycopy(packetKeys, 0, tmp, 0, x);
+ return tmp;
+ } else {
+ return packetKeys;
+ }
+ }
+
+ private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException {
+ // trying to cast a BasicPacket[] to Packet[] will compile,
+ // but lead to a runtime error. At least that's what I got
+ // from testing and reading the specification. I'm just happy
+ // if someone tells me what's the proper Java way of doing
+ // this. -SK
+ Packet[] packets = new Packet[basicPackets.length];
+
+ for (int i = 0; i < basicPackets.length; i++) {
+ packets[i] = (Packet) basicPackets[i];
+ }
+ return packets;
+ }
+
+ private String getName() {
+ return searcher.getName();
+ }
+
+ private Logger getLogger() {
+ return searcher.getLogger();
+ }
+
+ private boolean isLoggingFine() {
+ return getLogger().isLoggable(Level.FINE);
+ }
+}