diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-09-14 15:40:46 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-09-14 15:40:46 +0200 |
commit | 99205db587a3897fb38c685883b5d52161215e71 (patch) | |
tree | b6a9a35f5c6e9e511dda02c428c28844620335ff /container-search | |
parent | d166163770dde9d20975220457f4f6eb45336919 (diff) |
FS4 interfacing code moved to FS4CloseableChannel from FastSearcher
Diffstat (limited to 'container-search')
7 files changed, 432 insertions, 399 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java index 85e1aef3da0..6f87e45af25 100644 --- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java +++ b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java @@ -4,9 +4,11 @@ package com.yahoo.fs4; import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.log.LogLevel; +import com.yahoo.prelude.fastsearch.TimeoutException; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Optional; import java.util.logging.Logger; @@ -325,4 +327,20 @@ public abstract class BasicPacket { return false; } + /** + * Throws an IOException if the packet is not of the expected type + */ + public void ensureInstanceOf(Class<? extends BasicPacket> type, String name) throws IOException { + if ((type.isAssignableFrom(getClass()))) return; + + if (this instanceof ErrorPacket) { + ErrorPacket errorPacket = (ErrorPacket) this; + if (errorPacket.getErrorCode() == 8) + throw new TimeoutException("Query timed out in " + name); + else + throw new IOException("Received error from backend in " + name + ": " + this); + } else { + throw new IOException("Received " + this + " when expecting " + type); + } + } } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java new file mode 100644 index 00000000000..dc95f83365e --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java @@ -0,0 +1,351 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.DocsumPacket; +import com.yahoo.fs4.GetDocSumsPacket; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.QueryResultPacket; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.CloseableChannel; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.search.result.HitGroup; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator; + +/** + * {@link CloseableChannel} implementation for FS4 nodes and fdispatch + * + * @author ollivir + */ +public class FS4CloseableChannel extends CloseableChannel { + private final VespaBackEndSearcher searcher; + private FS4Channel channel; + private final Optional<Integer> distributionKey; + + public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port, + int distributionKey) { + this.searcher = searcher; + this.distributionKey = Optional.of(distributionKey); + + Backend backend = fs4ResourcePool.getBackend(hostname, port); + this.channel = backend.openChannel(); + channel.setQuery(query); + } + + // fdispatch code path + public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, Backend backend) { + this.searcher = searcher; + this.distributionKey = Optional.empty(); + this.channel = backend.openChannel(); + channel.setQuery(query); + } + + @Override + public Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { + if (isLoggingFine()) + getLogger().finest("sending query packet"); + + try { + boolean couldSend = channel.sendPacket(queryPacket); + if (!couldSend) + return new Result(query, ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'")); + } catch (InvalidChannelException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel " + getName())); + } catch (IllegalStateException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); + } + + BasicPacket[] basicPackets; + + try { + basicPackets = channel.receivePackets(query.getTimeLeft(), 1); + } catch (ChannelTimeoutException e) { + return new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); + } catch (InvalidChannelException e) { + return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); + } + + if (basicPackets.length == 0) { + return new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); + } + + if (isLoggingFine()) + getLogger().finest("got packets " + basicPackets.length + " packets"); + + basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); + QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; + + if (isLoggingFine()) + getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); + + if (query.getPresentation().getSummary() == null) + query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); + + Result result = new Result(query); + + searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result); + + searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey); + Packet[] packets; + CacheControl cacheControl = searcher.getCacheControl(); + PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); + + if (packetWrapper != null) { + cacheControl.updateCacheEntry(cacheKey, query, resultPacket); + } else { + if (resultPacket.getCoverageFeature() && !resultPacket.getCoverageFull()) { + // Don't add error here, it was done in first phase + // No check if packetWrapper already exists, since incomplete + // first phase data won't be cached anyway. + } else { + packets = new Packet[1]; + packets[0] = resultPacket; + cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey); + } + } + return result; + } + + @Override + public void partialFill(Result result, String summaryClass) { + Packet[] receivedPackets; + DocsumPacketKey[] packetKeys; + + CacheKey cacheKey = null; + PacketWrapper packetWrapper = null; + if (searcher.getCacheControl().useCache(channel.getQuery())) { + cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass); + if (cacheKey == null) { + QueryPacket queryPacket = QueryPacket.create(channel.getQuery()); + cacheKey = new CacheKey(queryPacket); + } + packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass); + } + + if (countFastHits(result) > 0) { + packetKeys = getPacketKeys(result, summaryClass, false); + if (packetKeys.length == 0) { + receivedPackets = new Packet[0]; + } else { + try { + receivedPackets = fetchSummaries(result, summaryClass); + } catch (InvalidChannelException e) { + result.hits() + .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); + return; + } catch (ChannelTimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); + return; + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage())); + return; + } + if (receivedPackets.length == 0) { + result.hits() + .addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); + return; + } + } + } else { + packetKeys = new DocsumPacketKey[0]; + receivedPackets = new Packet[0]; + } + + int skippedHits; + try { + FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass); + skippedHits = fillHitsResult.skippedHits; + if (fillHitsResult.error != null) { + result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); + return; + } + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); + return; + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage())); + return; + } + if (skippedHits == 0 && packetWrapper != null) { + searcher.getCacheControl().updateCacheEntry(cacheKey, channel.getQuery(), packetKeys, receivedPackets); + } + + if (skippedHits > 0) + result.hits().addError( + ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits")); + result.analyzeHits(); + + if (channel.getQuery().getTraceLevel() >= 3) { + int hitNumber = 0; + for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { + com.yahoo.search.result.Hit hit = i.next(); + if (!(hit instanceof FastHit)) + continue; + FastHit fastHit = (FastHit) hit; + + String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend"); + if (!fastHit.isFilled(summaryClass)) + traceMsg += ". Error, hit, not filled"; + channel.getQuery().trace(traceMsg, false, 3); + } + } + } + + @Override + public void closeChannel() { + if (channel != null) { + channel.close(); + channel = null; + } + } + + private PacketWrapper cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) { + Query query = result.getQuery(); + PacketWrapper packetWrapper = searcher.getCacheControl().lookup(cacheKey, query); + + if (packetWrapper == null) { + return null; + } + if (packetWrapper.getNumPackets() != 0) { + for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { + Hit hit = i.next(); + + if (hit instanceof FastHit) { + FastHit fastHit = (FastHit) hit; + DocsumPacketKey key = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass); + + if (searcher.fillHit(fastHit, (DocsumPacket) packetWrapper.getPacket(key), summaryClass).ok) { + fastHit.setCached(true); + } + + } + } + result.hits().setSorted(false); + result.analyzeHits(); + } + + return packetWrapper; + } + + private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) { + for (Iterator<Hit> i = hits.unorderedDeepIterator(); i.hasNext();) { + Hit h = i.next(); + if (h instanceof FastHit) { + FastHit hit = (FastHit) h; + if (hit.isFilled(summaryClass)) { + continue; + } + if (hit.getCacheKey() != null) { + return hit.getCacheKey(); + } + } + } + return null; + } + + private int countFastHits(Result result) { + int count = 0; + for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { + if (i.next() instanceof FastHit) + count++; + } + return count; + } + + private Packet[] fetchSummaries(Result result, String summaryClass) + throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException { + + BasicPacket[] receivedPackets; + boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery()); + if (result.getQuery().getTraceLevel() >= 3) + result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3); + + GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery); + int compressionLimit = result.getQuery().properties().getInteger(FastSearcher.PACKET_COMPRESSION_LIMIT, 0); + docsumsPacket.setCompressionLimit(compressionLimit); + if (compressionLimit != 0) { + docsumsPacket.setCompressionType(result.getQuery().properties().getString(FastSearcher.PACKET_COMPRESSION_TYPE, "lz4")); + } + + boolean couldSend = channel.sendPacket(docsumsPacket); + if (!couldSend) + throw new IOException("Could not successfully send GetDocSumsPacket."); + receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1); + + return convertBasicPackets(receivedPackets); + } + + /** + * Returns an array of the hits contained in a result + * + * @param filled + * true to return all hits, false to return only unfilled hits + * @return array of docids, empty array if no hits + */ + private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) { + DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()]; + int x = 0; + + for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { + com.yahoo.search.result.Hit hit = i.next(); + if (hit instanceof FastHit) { + FastHit fastHit = (FastHit) hit; + if (filled || !fastHit.isFilled(summaryClass)) { + packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass); + x++; + } + } + } + if (x < packetKeys.length) { + DocsumPacketKey[] tmp = new DocsumPacketKey[x]; + + System.arraycopy(packetKeys, 0, tmp, 0, x); + return tmp; + } else { + return packetKeys; + } + } + + private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException { + // trying to cast a BasicPacket[] to Packet[] will compile, + // but lead to a runtime error. At least that's what I got + // from testing and reading the specification. I'm just happy + // if someone tells me what's the proper Java way of doing + // this. -SK + Packet[] packets = new Packet[basicPackets.length]; + + for (int i = 0; i < basicPackets.length; i++) { + packets[i] = (Packet) basicPackets[i]; + } + return packets; + } + + private String getName() { + return searcher.getName(); + } + + private Logger getLogger() { + return searcher.getLogger(); + } + + private boolean isLoggingFine() { + return getLogger().isLoggable(Level.FINE); + } +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 333d3970cc4..d34d119c1fe 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -1,18 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; -import java.util.Optional; - import com.yahoo.compress.CompressionType; import com.yahoo.container.search.LegacyEmulationConfig; import com.yahoo.fs4.BasicPacket; import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.GetDocSumsPacket; -import com.yahoo.fs4.Packet; import com.yahoo.fs4.PingPacket; import com.yahoo.fs4.PongPacket; import com.yahoo.fs4.QueryPacket; -import com.yahoo.fs4.QueryResultPacket; import com.yahoo.fs4.mplex.Backend; import com.yahoo.fs4.mplex.FS4Channel; import com.yahoo.fs4.mplex.InvalidChannelException; @@ -29,13 +24,11 @@ import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Hit; -import com.yahoo.search.result.HitGroup; import com.yahoo.search.searchchain.Execution; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; -import java.util.Iterator; +import java.util.Optional; import java.util.logging.Level; import static com.yahoo.container.util.Util.quote; @@ -99,15 +92,6 @@ public class FastSearcher extends VespaBackEndSearcher { this.dispatcher = dispatcher; } - private int countFastHits(Result result) { - int count = 0; - for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { - if (i.next() instanceof FastHit) - count++; - } - return count; - } - /** * Pings the backend. Does not propagate to other searchers. */ @@ -153,7 +137,7 @@ public class FastSearcher extends VespaBackEndSearcher { } try { - ensureInstanceOf(PongPacket.class, packets[0], name); + packets[0].ensureInstanceOf(PongPacket.class, name); } catch (TimeoutException e) { return new Pong(ErrorMessage.createTimeout(e.getMessage())); } catch (IOException e) { @@ -176,9 +160,7 @@ public class FastSearcher extends VespaBackEndSearcher { if (dispatcher.searchCluster().groupSize() == 1) forceSinglePassGrouping(query); try(CloseableChannel channel = getChannel(query)) { - channel.setQuery(query); - - Result result = searchTwoPhase(channel, query, queryPacket, cacheKey); + Result result = channel.search(query, queryPacket, cacheKey); if (query.properties().getBoolean(Ranking.RANKFEATURES, false)) { // There is currently no correct choice for which @@ -214,7 +196,7 @@ public class FastSearcher extends VespaBackEndSearcher { } /** - * Returns an interface object to issue a search request over. + * Returns a request interface object for the given query. * Normally this is built from the backend field of this instance, which connects to the dispatch node * this component talks to (which is why this instance was chosen by the cluster controller). However, * under certain conditions we will instead return an interface which connects directly to the relevant @@ -222,24 +204,24 @@ public class FastSearcher extends VespaBackEndSearcher { */ private CloseableChannel getChannel(Query query) { if (query.properties().getBoolean(dispatchInternal, false)) { - Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(query); + Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(this, query); if (dispatchedChannel.isPresent()) { return dispatchedChannel.get(); } } if (!query.properties().getBoolean(dispatchDirect, true)) - return new CloseableChannel(dispatchBackend); + return new FS4CloseableChannel(this, query, dispatchBackend); if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) - return new CloseableChannel(dispatchBackend); + return new FS4CloseableChannel(this, query, dispatchBackend); Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); if (!directDispatchRecipient.isPresent()) - return new CloseableChannel(dispatchBackend); + return new FS4CloseableChannel(this, query, dispatchBackend); // Dispatch directly to the single, local search node + SearchCluster.Node local = directDispatchRecipient.get(); query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get()); - return new CloseableChannel(fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(), - directDispatchRecipient.get().fs4port()), Optional.of(directDispatchRecipient.get().key())); + return new FS4CloseableChannel(this, query, fs4ResourcePool, local.hostname(), local.fs4port(), local.key()); } /** @@ -267,86 +249,8 @@ public class FastSearcher extends VespaBackEndSearcher { return; } - CacheKey cacheKey = null; - PacketWrapper packetWrapper = null; - if (getCacheControl().useCache(query)) { - cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass); - if (cacheKey == null) { - QueryPacket queryPacket = QueryPacket.create(query); - cacheKey = new CacheKey(queryPacket); - } - packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass); - } - - Packet[] receivedPackets; - try(CloseableChannel channel = getChannel(query)) { - channel.setQuery(query); - DocsumPacketKey[] packetKeys; - - if (countFastHits(result) > 0) { - packetKeys = getPacketKeys(result, summaryClass, false); - if (packetKeys.length == 0) { - receivedPackets = new Packet[0]; - } else { - try { - receivedPackets = fetchSummaries(channel, result, summaryClass); - } catch (InvalidChannelException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); - return; - } catch (ChannelTimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); - return; - } catch (IOException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError( - "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage())); - return; - } - if (receivedPackets.length == 0) { - result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); - return; - } - } - } else { - packetKeys = new DocsumPacketKey[0]; - receivedPackets = new Packet[0]; - } - - int skippedHits; - try { - FillHitsResult fillHitsResult = fillHits(result, receivedPackets, summaryClass); - skippedHits = fillHitsResult.skippedHits; - if (fillHitsResult.error != null) { - result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error)); - return; - } - } catch (TimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout(e.getMessage())); - return; - } catch (IOException e) { - result.hits().addError(ErrorMessage.createBackendCommunicationError("Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage())); - return; - } - if (skippedHits == 0 && packetWrapper != null) { - cacheControl.updateCacheEntry(cacheKey, query, packetKeys, receivedPackets); - } - - if ( skippedHits > 0 ) - result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits")); - result.analyzeHits(); - - if (query.getTraceLevel() >= 3) { - int hitNumber = 0; - for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { - com.yahoo.search.result.Hit hit = i.next(); - if ( ! (hit instanceof FastHit)) continue; - FastHit fastHit = (FastHit) hit; - - String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend" ); - if ( ! fastHit.isFilled(summaryClass)) - traceMsg += ". Error, hit, not filled"; - query.trace(traceMsg, false, 3); - } - } + try (CloseableChannel channel = getChannel(query)) { + channel.partialFill(result, summaryClass); } } @@ -362,158 +266,10 @@ public class FastSearcher extends VespaBackEndSearcher { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } - private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) { - for (Iterator<Hit> i = hits.unorderedDeepIterator(); i.hasNext();) { - Hit h = i.next(); - if (h instanceof FastHit) { - FastHit hit = (FastHit) h; - if (hit.isFilled(summaryClass)) { - continue; - } - if (hit.getCacheKey() != null) { - return hit.getCacheKey(); - } - } - } - return null; - } - - private Result searchTwoPhase(CloseableChannel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { - if (isLoggingFine()) - getLogger().finest("sending query packet"); - - try { - boolean couldSend = channel.sendPacket(queryPacket); - if ( ! couldSend) - return new Result(query,ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'")); - } catch (InvalidChannelException e) { - return new Result(query,ErrorMessage.createBackendCommunicationError("Invalid channel " + getName())); - } catch (IllegalStateException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); - } - - BasicPacket[] basicPackets; - - try { - basicPackets = channel.receivePackets(query.getTimeLeft(), 1); - } catch (ChannelTimeoutException e) { - return new Result(query,ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } catch (InvalidChannelException e) { - return new Result(query,ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); - } - - if (basicPackets.length == 0) { - return new Result(query,ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); - } - - if (isLoggingFine()) - getLogger().finest("got packets " + basicPackets.length + " packets"); - - ensureInstanceOf(QueryResultPacket.class, basicPackets[0], getName()); - QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; - - if (isLoggingFine()) - getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); - - if (query.getPresentation().getSummary() == null) - query.getPresentation().setSummary(getDefaultDocsumClass()); - - Result result = new Result(query); - - addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result, false); - - addUnfilledHits(result, resultPacket.getDocuments(), false, - queryPacket.getQueryPacketData(), cacheKey, channel.distributionKey()); - Packet[] packets; - PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); - - if (packetWrapper != null) { - cacheControl.updateCacheEntry(cacheKey, query, resultPacket); - } - else { - if (resultPacket.getCoverageFeature() && ! resultPacket.getCoverageFull()) { - // Don't add error here, it was done in first phase - // No check if packetWrapper already exists, since incomplete - // first phase data won't be cached anyway. - } else { - packets = new Packet[1]; - packets[0] = resultPacket; - cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, channel.distributionKey()); - } - } - return result; - } - - private Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException { - // trying to cast a BasicPacket[] to Packet[] will compile, - // but lead to a runtime error. At least that's what I got - // from testing and reading the specification. I'm just happy - // if someone tells me what's the proper Java way of doing - // this. -SK - Packet[] packets = new Packet[basicPackets.length]; - - for (int i = 0; i < basicPackets.length; i++) { - packets[i] = (Packet) basicPackets[i]; - } - return packets; - } - - private Packet[] fetchSummaries(CloseableChannel channel, Result result, String summaryClass) - throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException { - - BasicPacket[] receivedPackets; - boolean summaryNeedsQuery = summaryNeedsQuery(result.getQuery()); - if (result.getQuery().getTraceLevel() >=3) - result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3); - - GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery); - int compressionLimit = result.getQuery().properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); - docsumsPacket.setCompressionLimit(compressionLimit); - if (compressionLimit != 0) { - docsumsPacket.setCompressionType(result.getQuery().properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); - } - - boolean couldSend = channel.sendPacket(docsumsPacket); - if ( ! couldSend) throw new IOException("Could not successfully send GetDocSumsPacket."); - receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1); - - return convertBasicPackets(receivedPackets); - } - public String toString() { return "fast searcher (" + getName() + ") " + dispatchBackend; } - /** - * Returns an array of the hits contained in this result - * - * @param filled true to return all hits, false to return only unfilled hits - * @return array of docids, empty array if no hits - */ - private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) { - DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()]; - int x = 0; - - for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) { - com.yahoo.search.result.Hit hit = i.next(); - if (hit instanceof FastHit) { - FastHit fastHit = (FastHit) hit; - if(filled || !fastHit.isFilled(summaryClass)) { - packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass); - x++; - } - } - } - if (x < packetKeys.length) { - DocsumPacketKey[] tmp = new DocsumPacketKey[x]; - - System.arraycopy(packetKeys, 0, tmp, 0, x); - return tmp; - } else { - return packetKeys; - } - } - protected boolean isLoggingFine() { return getLogger().isLoggable(Level.FINE); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index 3e9a92ea0f7..a6f98418a76 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -1,15 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; -import java.util.Optional; import com.yahoo.collections.TinyIdentitySet; -import com.yahoo.fs4.BasicPacket; import com.yahoo.fs4.DocsumPacket; import com.yahoo.fs4.DocumentInfo; -import com.yahoo.fs4.ErrorPacket; -import com.yahoo.fs4.QueryPacketData; import com.yahoo.fs4.Packet; import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.QueryPacketData; import com.yahoo.fs4.QueryResultPacket; import com.yahoo.io.GrowableByteBuffer; import com.yahoo.io.HexDump; @@ -42,7 +39,9 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.logging.Level; +import java.util.logging.Logger; /** @@ -53,8 +52,8 @@ import java.util.logging.Level; @SuppressWarnings("deprecation") public abstract class VespaBackEndSearcher extends PingableSearcher { - protected static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); - protected static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype"); + static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); + static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype"); protected static final CompoundName TRACE_DISABLE = new CompoundName("trace.disable"); /** The set of all document databases available in the backend handled by this searcher */ @@ -65,7 +64,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { private String defaultDocsumClass = null; /** Returns an iterator which returns all hits below this result **/ - protected Iterator<Hit> hitIterator(Result result) { + static Iterator<Hit> hitIterator(Result result) { return result.hits().unorderedDeepIterator(); } @@ -75,7 +74,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { /** Cache wrapper */ protected CacheControl cacheControl = null; - protected final String getName() { return name; } + public final String getName() { return name; } protected final String getDefaultDocsumClass() { return defaultDocsumClass; } /** Sets default document summary class. Default is null */ @@ -84,6 +83,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { /** Returns the packet cache controller of this */ public final CacheControl getCacheControl() { return cacheControl; } + public final Logger getLogger() { return super.getLogger(); } + /** * Searches a search cluster * This is an endpoint - searchers will never propagate the search to any nested searcher. @@ -101,7 +102,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * Returns whether we need to send the query when fetching summaries. * This is necessary if the query requests summary features or dynamic snippeting */ - protected boolean summaryNeedsQuery(Query query) { + boolean summaryNeedsQuery(Query query) { if (query.getRanking().getQueryCache()) return false; // Query is cached in backend DocumentDatabase documentDb = getDocumentDatabase(query); @@ -135,7 +136,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { Result result = new Result(query); QueryResultPacket resultPacket = packetWrapper.getFirstResultPacket(); - addMetaInfo(query, queryPacketData, resultPacket, result, true); + addMetaInfo(query, queryPacketData, resultPacket, result); if (packetWrapper.getNumPackets() == 0) addUnfilledHits(result, documents, true, queryPacketData, key, packetWrapper.distributionKey()); else @@ -400,7 +401,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { } } - protected void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result, boolean fromCache) { + void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result) { result.setTotalHitCount(resultPacket.getTotalDocumentCount()); // Grouping @@ -429,7 +430,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { } } - static private class FillHitResult { + static class FillHitResult { final boolean ok; final String error; FillHitResult(boolean ok) { @@ -440,7 +441,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { this.error = error; } } - private FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) { + + FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) { if (packet != null) { byte[] docsumdata = packet.getData(); if (docsumdata.length > 0) { @@ -464,7 +466,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * @return the number of hits that we did not return data for, and an optional error message. * when things are working normally we return 0. */ - protected FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException { + public FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException { int skippedHits = 0; String lastError = null; int packetIndex = 0; @@ -474,7 +476,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if (hit instanceof FastHit && ! hit.isFilled(summaryClass)) { FastHit fastHit = (FastHit) hit; - ensureInstanceOf(DocsumPacket.class, packets[packetIndex], getName()); + packets[packetIndex].ensureInstanceOf(DocsumPacket.class, getName()); DocsumPacket docsum = (DocsumPacket) packets[packetIndex]; packetIndex++; @@ -493,23 +495,6 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return new FillHitsResult(skippedHits, lastError); } - /** - * Throws an IOException if the packet is not of the expected type - */ - protected static void ensureInstanceOf(Class<? extends BasicPacket> type, BasicPacket packet, String name) throws IOException { - if ((type.isAssignableFrom(packet.getClass()))) return; - - if (packet instanceof ErrorPacket) { - ErrorPacket errorPacket=(ErrorPacket)packet; - if (errorPacket.getErrorCode() == 8) - throw new TimeoutException("Query timed out in " + name); - else - throw new IOException("Received error from backend in " + name + ": " + packet); - } else { - throw new IOException("Received " + packet + " when expecting " + type); - } - } - private boolean addCachedHits(Result result, PacketWrapper packetWrapper, String summaryClass, @@ -562,34 +547,6 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { hit.setPartId(document.getPartId()); } - protected PacketWrapper cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) { - Query query = result.getQuery(); - PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); - - if (packetWrapper == null) { - return null; - } - if (packetWrapper.getNumPackets() != 0) { - for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { - Hit hit = i.next(); - - if (hit instanceof FastHit) { - FastHit fastHit = (FastHit) hit; - DocsumPacketKey key = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass); - - if (fillHit(fastHit, (DocsumPacket) packetWrapper.getPacket(key), summaryClass).ok) { - fastHit.setCached(true); - } - - } - } - result.hits().setSorted(false); - result.analyzeHits(); - } - - return packetWrapper; - } - protected DocsumDefinitionSet getDocsumDefinitionSet(Query query) { DocumentDatabase db = getDocumentDatabase(query); return db.getDocsumDefinitionSet(); @@ -620,13 +577,12 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * Only set if produced directly by a search node, not dispatch * (in which case it is not set in the received packets.) */ - boolean addUnfilledHits(Result result, + void addUnfilledHits(Result result, List<DocumentInfo> documents, boolean fromCache, QueryPacketData queryPacketData, CacheKey cacheKey, Optional<Integer> channelDistributionKey) { - boolean allHitsOK = true; Query myQuery = result.getQuery(); for (DocumentInfo document : documents) { @@ -646,14 +602,11 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { result.hits().add(hit); } catch (ConfigurationException e) { - allHitsOK = false; getLogger().log(LogLevel.WARNING, "Skipping hit", e); } catch (Exception e) { - allHitsOK = false; getLogger().log(LogLevel.ERROR, "Skipping malformed hit", e); } } - return allHitsOK; } @SuppressWarnings("rawtypes") diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java index 643b8f81318..3f5ebe53d0d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java @@ -1,54 +1,41 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.mplex.Backend; -import com.yahoo.fs4.mplex.FS4Channel; -import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.fastsearch.CacheKey; import com.yahoo.search.Query; +import com.yahoo.search.Result; import java.io.Closeable; import java.io.IOException; -import java.util.Optional; /** + * CloseableChannel is an interface for running a search query and getting document summaries against some + * content node, node group or dispatcher while abstracting the specifics of the invocation target. + * * @author ollivir */ -public class CloseableChannel implements Closeable { - private FS4Channel channel; - private final Optional<Integer> distributionKey; +public abstract class CloseableChannel implements Closeable { + /** Retrieve the hits for the given {@link Query} */ + public abstract Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException; - public CloseableChannel(Backend backend) { - this(backend, Optional.empty()); - } - - public CloseableChannel(Backend backend, Optional<Integer> distributionKey) { - this.channel = backend.openChannel(); - this.distributionKey = distributionKey; - } + /** Retrieve document summaries for the unfilled hits in the given {@link Result} */ + public abstract void partialFill(Result result, String summaryClass); - public void setQuery(Query query) { - channel.setQuery(query); - } + protected abstract void closeChannel(); - public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException { - return channel.sendPacket(packet); - } - - public BasicPacket[] receivePackets(long timeout, int packetCount) throws InvalidChannelException, ChannelTimeoutException { - return channel.receivePackets(timeout, packetCount); - } + private Runnable teardown = null; - public Optional<Integer> distributionKey() { - return distributionKey; + public void teardown(Runnable teardown) { + this.teardown = teardown; } @Override - public void close() { - if (channel != null) { - channel.close(); - channel = null; + public final void close() { + if (teardown != null) { + teardown.run(); + teardown = null; } + closeChannel(); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java deleted file mode 100644 index d005d9491d5..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch; - -import com.yahoo.prelude.fastsearch.FS4ResourcePool; -import com.yahoo.search.dispatch.SearchCluster.Group; -import com.yahoo.search.dispatch.SearchCluster.Node; - -import java.util.Optional; - -/** - * An extension to CloseableChannel that encapsulates the release of a LoadBalancer group allocation. - * - * @author ollivir - */ -public class DispatchedChannel extends CloseableChannel { - private final SearchCluster.Group group; - private final LoadBalancer loadBalancer; - private boolean groupAllocated = true; - - public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group, Node node) { - super(fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), Optional.of(node.key())); - - this.loadBalancer = loadBalancer; - this.group = group; - } - - public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group) { - this(fs4ResourcePool, loadBalancer, group, group.nodes().iterator().next()); - } - - public void close() { - if (groupAllocated) { - groupAllocated = false; - loadBalancer.releaseGroup(group); - } - super.close(); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index c383b681558..0cf18852dd3 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -8,18 +8,20 @@ import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.container.handler.VipStatus; import com.yahoo.container.protect.Error; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.slime.ArrayTraverser; +import com.yahoo.data.access.Inspector; import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.FS4CloseableChannel; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.TimeoutException; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.query.SessionId; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; -import com.yahoo.data.access.Inspector; +import com.yahoo.slime.ArrayTraverser; import com.yahoo.slime.BinaryFormat; import com.yahoo.slime.Cursor; import com.yahoo.slime.Slime; @@ -52,7 +54,7 @@ public class Dispatcher extends AbstractComponent { /** A model of the search cluster this dispatches to */ private final SearchCluster searchCluster; - + /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections; @@ -84,7 +86,7 @@ public class Dispatcher extends AbstractComponent { this.fs4ResourcePool = null; this.loadBalancer = new LoadBalancer(searchCluster); } - + /** Returns the search cluster this dispatches to */ public SearchCluster searchCluster() { return searchCluster; } @@ -283,14 +285,18 @@ public class Dispatcher extends AbstractComponent { } - public Optional<CloseableChannel> getDispatchedChannel(Query query) { + public Optional<CloseableChannel> getDispatchedChannel(VespaBackEndSearcher searcher, Query query) { Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); return groupInCluster.flatMap(group -> { if(group.nodes().size() == 1) { SearchCluster.Node node = group.nodes().iterator().next(); query.trace(false, 2, "Dispatching internally to ", group, " (", node.toString(), ")"); - return Optional.of(new DispatchedChannel(fs4ResourcePool, loadBalancer, group)); + CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()); + channel.teardown(() -> { + loadBalancer.releaseGroup(group); + }); + return Optional.of(channel); } else { loadBalancer.releaseGroup(group); return Optional.empty(); |