diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java | 268 |
1 files changed, 12 insertions, 256 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 333d3970cc4..d34d119c1fe 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -1,18 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; -import java.util.Optional; - import com.yahoo.compress.CompressionType; import com.yahoo.container.search.LegacyEmulationConfig; import com.yahoo.fs4.BasicPacket; import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.GetDocSumsPacket; -import com.yahoo.fs4.Packet; import com.yahoo.fs4.PingPacket; import com.yahoo.fs4.PongPacket; import com.yahoo.fs4.QueryPacket; -import com.yahoo.fs4.QueryResultPacket; import com.yahoo.fs4.mplex.Backend; import com.yahoo.fs4.mplex.FS4Channel; import com.yahoo.fs4.mplex.InvalidChannelException; @@ -29,13 +24,11 @@ import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Hit; -import com.yahoo.search.result.HitGroup; import com.yahoo.search.searchchain.Execution; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; -import java.util.Iterator; +import java.util.Optional; import java.util.logging.Level; import static com.yahoo.container.util.Util.quote; @@ -99,15 +92,6 @@ public class FastSearcher extends VespaBackEndSearcher { this.dispatcher = dispatcher; } - private int countFastHits(Result result) { - int count = 0; - for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { - if (i.next() instanceof FastHit) - count++; - } - return count; - } - /** * Pings the backend. Does not propagate to other searchers. */ @@ -153,7 +137,7 @@ public class FastSearcher extends VespaBackEndSearcher { } try { - ensureInstanceOf(PongPacket.class, packets[0], name); + packets[0].ensureInstanceOf(PongPacket.class, name); } catch (TimeoutException e) { return new Pong(ErrorMessage.createTimeout(e.getMessage())); } catch (IOException e) { @@ -176,9 +160,7 @@ public class FastSearcher extends VespaBackEndSearcher { if (dispatcher.searchCluster().groupSize() == 1) forceSinglePassGrouping(query); try(CloseableChannel channel = getChannel(query)) { - channel.setQuery(query); - - Result result = searchTwoPhase(channel, query, queryPacket, cacheKey); + Result result = channel.search(query, queryPacket, cacheKey); if (query.properties().getBoolean(Ranking.RANKFEATURES, false)) { // There is currently no correct choice for which @@ -214,7 +196,7 @@ public class FastSearcher extends VespaBackEndSearcher { } /** - * Returns an interface object to issue a search request over. + * Returns a request interface object for the given query. * Normally this is built from the backend field of this instance, which connects to the dispatch node * this component talks to (which is why this instance was chosen by the cluster controller). However, * under certain conditions we will instead return an interface which connects directly to the relevant @@ -222,24 +204,24 @@ public class FastSearcher extends VespaBackEndSearcher { */ private CloseableChannel getChannel(Query query) { if (query.properties().getBoolean(dispatchInternal, false)) { - Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(query); + Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(this, query); if (dispatchedChannel.isPresent()) { return dispatchedChannel.get(); } } if (!query.properties().getBoolean(dispatchDirect, true)) - return new CloseableChannel(dispatchBackend); + return new FS4CloseableChannel(this, query, dispatchBackend); if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) - return new CloseableChannel(dispatchBackend); + return new FS4CloseableChannel(this, query, dispatchBackend); Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); if (!directDispatchRecipient.isPresent()) - return new CloseableChannel(dispatchBackend); + return new FS4CloseableChannel(this, query, dispatchBackend); // Dispatch directly to the single, local search node + SearchCluster.Node local = directDispatchRecipient.get(); query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get()); - return new CloseableChannel(fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), - directDispatchRecipient.get().fs4port()), Optional.of(directDispatchRecipient.get().key())); + return new FS4CloseableChannel(this, query, fs4ResourcePool, local.hostname(), local.fs4port(), local.key()); } /** @@ -267,86 +249,8 @@ public class FastSearcher extends VespaBackEndSearcher { return; } - CacheKey cacheKey = null; - PacketWrapper packetWrapper = null; - if (getCacheControl().useCache(query)) { - cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass); - if (cacheKey == null) { - QueryPacket queryPacket = QueryPacket.create(query); - cacheKey = new CacheKey(queryPacket); - } - packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass); - } - - Packet[] receivedPackets; - try(CloseableChannel channel = getChannel(query)) { - channel.setQuery(query); - DocsumPacketKey[] packetKeys; - - if (countFastHits(result) > 0) { - packetKeys = getPacketKeys(result, summaryClass, false); - if (packetKeys.length == 0) { - receivedPackets = new Packet[0]; - } else { - try { - receivedPackets = fetchSummaries(channel, result, summaryClass); - } catch (InvalidChannelException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); - return; - } catch (ChannelTimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); - return; - } catch (IOException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError( - "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage())); - return; - } - if (receivedPackets.length == 0) { - result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); - return; - } - } - } else { - packetKeys = new DocsumPacketKey[0]; - receivedPackets = new Packet[0]; - } - - int skippedHits; - try { - FillHitsResult fillHitsResult = fillHits(result, receivedPackets, summaryClass); - skippedHits = fillHitsResult.skippedHits; - if (fillHitsResult.error != null) { - result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); - return; - } - } catch (TimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); - return; - } catch (IOException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError("Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage())); - return; - } - if (skippedHits == 0 && packetWrapper != null) { - cacheControl.updateCacheEntry(cacheKey, query, packetKeys, receivedPackets); - } - - if ( skippedHits > 0 ) - result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits")); - result.analyzeHits(); - - if (query.getTraceLevel() >= 3) { - int hitNumber = 0; - for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { - com.yahoo.search.result.Hit hit = i.next(); - if ( ! (hit instanceof FastHit)) continue; - FastHit fastHit = (FastHit) hit; - - String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend" ); - if ( ! fastHit.isFilled(summaryClass)) - traceMsg += ". Error, hit, not filled"; - query.trace(traceMsg, false, 3); - } - } + try (CloseableChannel channel = getChannel(query)) { + channel.partialFill(result, summaryClass); } } @@ -362,158 +266,10 @@ public class FastSearcher extends VespaBackEndSearcher { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } - private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) { - for (Iterator<Hit> i = hits.unorderedDeepIterator(); i.hasNext();) { - Hit h = i.next(); - if (h instanceof FastHit) { - FastHit hit = (FastHit) h; - if (hit.isFilled(summaryClass)) { - continue; - } - if (hit.getCacheKey() != null) { - return hit.getCacheKey(); - } - } - } - return null; - } - - private Result searchTwoPhase(CloseableChannel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { - if (isLoggingFine()) - getLogger().finest("sending query packet"); - - try { - boolean couldSend = channel.sendPacket(queryPacket); - if ( ! couldSend) - return new Result(query,ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'")); - } catch (InvalidChannelException e) { - return new Result(query,ErrorMessage.createBackendCommunicationError("Invalid channel " + getName())); - } catch (IllegalStateException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); - } - - BasicPacket[] basicPackets; - - try { - basicPackets = channel.receivePackets(query.getTimeLeft(), 1); - } catch (ChannelTimeoutException e) { - return new Result(query,ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } catch (InvalidChannelException e) { - return new Result(query,ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); - } - - if (basicPackets.length == 0) { - return new Result(query,ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); - } - - if (isLoggingFine()) - getLogger().finest("got packets " + basicPackets.length + " packets"); - - ensureInstanceOf(QueryResultPacket.class, basicPackets[0], getName()); - QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; - - if (isLoggingFine()) - getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); - - if (query.getPresentation().getSummary() == null) - query.getPresentation().setSummary(getDefaultDocsumClass()); - - Result result = new Result(query); - - addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result, false); - - addUnfilledHits(result, resultPacket.getDocuments(), false, - queryPacket.getQueryPacketData(), cacheKey, channel.distributionKey()); - Packet[] packets; - PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); - - if (packetWrapper != null) { - cacheControl.updateCacheEntry(cacheKey, query, resultPacket); - } - else { - if (resultPacket.getCoverageFeature() && ! resultPacket.getCoverageFull()) { - // Don't add error here, it was done in first phase - // No check if packetWrapper already exists, since incomplete - // first phase data won't be cached anyway. - } else { - packets = new Packet[1]; - packets[0] = resultPacket; - cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, channel.distributionKey()); - } - } - return result; - } - - private Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException { - // trying to cast a BasicPacket[] to Packet[] will compile, - // but lead to a runtime error. At least that's what I got - // from testing and reading the specification. I'm just happy - // if someone tells me what's the proper Java way of doing - // this. -SK - Packet[] packets = new Packet[basicPackets.length]; - - for (int i = 0; i < basicPackets.length; i++) { - packets[i] = (Packet) basicPackets[i]; - } - return packets; - } - - private Packet[] fetchSummaries(CloseableChannel channel, Result result, String summaryClass) - throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException { - - BasicPacket[] receivedPackets; - boolean summaryNeedsQuery = summaryNeedsQuery(result.getQuery()); - if (result.getQuery().getTraceLevel() >=3) - result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3); - - GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery); - int compressionLimit = result.getQuery().properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); - docsumsPacket.setCompressionLimit(compressionLimit); - if (compressionLimit != 0) { - docsumsPacket.setCompressionType(result.getQuery().properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); - } - - boolean couldSend = channel.sendPacket(docsumsPacket); - if ( ! couldSend) throw new IOException("Could not successfully send GetDocSumsPacket."); - receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1); - - return convertBasicPackets(receivedPackets); - } - public String toString() { return "fast searcher (" + getName() + ") " + dispatchBackend; } - /** - * Returns an array of the hits contained in this result - * - * @param filled true to return all hits, false to return only unfilled hits - * @return array of docids, empty array if no hits - */ - private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) { - DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()]; - int x = 0; - - for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { - com.yahoo.search.result.Hit hit = i.next(); - if (hit instanceof FastHit) { - FastHit fastHit = (FastHit) hit; - if(filled || !fastHit.isFilled(summaryClass)) { - packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass); - x++; - } - } - } - if (x < packetKeys.length) { - DocsumPacketKey[] tmp = new DocsumPacketKey[x]; - - System.arraycopy(packetKeys, 0, tmp, 0, x); - return tmp; - } else { - return packetKeys; - } - } - protected boolean isLoggingFine() { return getLogger().isLoggable(Level.FINE); } |