summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2018-09-14 15:40:46 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2018-09-14 15:40:46 +0200
commit99205db587a3897fb38c685883b5d52161215e71 (patch)
treeb6a9a35f5c6e9e511dda02c428c28844620335ff /container-search
parentd166163770dde9d20975220457f4f6eb45336919 (diff)
FS4 interfacing code moved to FS4CloseableChannel from FastSearcher
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/fs4/BasicPacket.java18
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java351
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java268
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java83
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java53
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java38
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java20
7 files changed, 432 insertions, 399 deletions
diff --git a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
index 85e1aef3da0..6f87e45af25 100644
--- a/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
+++ b/container-search/src/main/java/com/yahoo/fs4/BasicPacket.java
@@ -4,9 +4,11 @@ package com.yahoo.fs4;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.log.LogLevel;
+import com.yahoo.prelude.fastsearch.TimeoutException;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.logging.Logger;
@@ -325,4 +327,20 @@ public abstract class BasicPacket {
return false;
}
+ /**
+ * Throws an IOException if the packet is not of the expected type
+ */
+ public void ensureInstanceOf(Class<? extends BasicPacket> type, String name) throws IOException {
+ if ((type.isAssignableFrom(getClass()))) return;
+
+ if (this instanceof ErrorPacket) {
+ ErrorPacket errorPacket = (ErrorPacket) this;
+ if (errorPacket.getErrorCode() == 8)
+ throw new TimeoutException("Query timed out in " + name);
+ else
+ throw new IOException("Received error from backend in " + name + ": " + this);
+ } else {
+ throw new IOException("Received " + this + " when expecting " + type);
+ }
+ }
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
new file mode 100644
index 00000000000..dc95f83365e
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
@@ -0,0 +1,351 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.DocsumPacket;
+import com.yahoo.fs4.GetDocSumsPacket;
+import com.yahoo.fs4.Packet;
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.fs4.QueryResultPacket;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.CloseableChannel;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.search.result.HitGroup;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator;
+
+/**
+ * {@link CloseableChannel} implementation for FS4 nodes and fdispatch
+ *
+ * @author ollivir
+ */
+public class FS4CloseableChannel extends CloseableChannel {
+ private final VespaBackEndSearcher searcher;
+ private FS4Channel channel;
+ private final Optional<Integer> distributionKey;
+
+ public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port,
+ int distributionKey) {
+ this.searcher = searcher;
+ this.distributionKey = Optional.of(distributionKey);
+
+ Backend backend = fs4ResourcePool.getBackend(hostname, port);
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ // fdispatch code path
+ public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, Backend backend) {
+ this.searcher = searcher;
+ this.distributionKey = Optional.empty();
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ @Override
+ public Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
+ if (isLoggingFine())
+ getLogger().finest("sending query packet");
+
+ try {
+ boolean couldSend = channel.sendPacket(queryPacket);
+ if (!couldSend)
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'"));
+ } catch (InvalidChannelException e) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()));
+ } catch (IllegalStateException e) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()));
+ }
+
+ BasicPacket[] basicPackets;
+
+ try {
+ basicPackets = channel.receivePackets(query.getTimeLeft(), 1);
+ } catch (ChannelTimeoutException e) {
+ return new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
+ } catch (InvalidChannelException e) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()));
+ }
+
+ if (basicPackets.length == 0) {
+ return new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"));
+ }
+
+ if (isLoggingFine())
+ getLogger().finest("got packets " + basicPackets.length + " packets");
+
+ basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName());
+ QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0];
+
+ if (isLoggingFine())
+ getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary());
+
+ if (query.getPresentation().getSummary() == null)
+ query.getPresentation().setSummary(searcher.getDefaultDocsumClass());
+
+ Result result = new Result(query);
+
+ searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result);
+
+ searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey);
+ Packet[] packets;
+ CacheControl cacheControl = searcher.getCacheControl();
+ PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query);
+
+ if (packetWrapper != null) {
+ cacheControl.updateCacheEntry(cacheKey, query, resultPacket);
+ } else {
+ if (resultPacket.getCoverageFeature() && !resultPacket.getCoverageFull()) {
+ // Don't add error here, it was done in first phase
+ // No check if packetWrapper already exists, since incomplete
+ // first phase data won't be cached anyway.
+ } else {
+ packets = new Packet[1];
+ packets[0] = resultPacket;
+ cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void partialFill(Result result, String summaryClass) {
+ Packet[] receivedPackets;
+ DocsumPacketKey[] packetKeys;
+
+ CacheKey cacheKey = null;
+ PacketWrapper packetWrapper = null;
+ if (searcher.getCacheControl().useCache(channel.getQuery())) {
+ cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass);
+ if (cacheKey == null) {
+ QueryPacket queryPacket = QueryPacket.create(channel.getQuery());
+ cacheKey = new CacheKey(queryPacket);
+ }
+ packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass);
+ }
+
+ if (countFastHits(result) > 0) {
+ packetKeys = getPacketKeys(result, summaryClass, false);
+ if (packetKeys.length == 0) {
+ receivedPackets = new Packet[0];
+ } else {
+ try {
+ receivedPackets = fetchSummaries(result, summaryClass);
+ } catch (InvalidChannelException e) {
+ result.hits()
+ .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
+ return;
+ } catch (ChannelTimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName()));
+ return;
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage()));
+ return;
+ }
+ if (receivedPackets.length == 0) {
+ result.hits()
+ .addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)"));
+ return;
+ }
+ }
+ } else {
+ packetKeys = new DocsumPacketKey[0];
+ receivedPackets = new Packet[0];
+ }
+
+ int skippedHits;
+ try {
+ FillHitsResult fillHitsResult = searcher.fillHits(result, receivedPackets, summaryClass);
+ skippedHits = fillHitsResult.skippedHits;
+ if (fillHitsResult.error != null) {
+ result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error));
+ return;
+ }
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout(e.getMessage()));
+ return;
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage()));
+ return;
+ }
+ if (skippedHits == 0 && packetWrapper != null) {
+ searcher.getCacheControl().updateCacheEntry(cacheKey, channel.getQuery(), packetKeys, receivedPackets);
+ }
+
+ if (skippedHits > 0)
+ result.hits().addError(
+ ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits"));
+ result.analyzeHits();
+
+ if (channel.getQuery().getTraceLevel() >= 3) {
+ int hitNumber = 0;
+ for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
+ com.yahoo.search.result.Hit hit = i.next();
+ if (!(hit instanceof FastHit))
+ continue;
+ FastHit fastHit = (FastHit) hit;
+
+ String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend");
+ if (!fastHit.isFilled(summaryClass))
+ traceMsg += ". Error, hit, not filled";
+ channel.getQuery().trace(traceMsg, false, 3);
+ }
+ }
+ }
+
+ @Override
+ public void closeChannel() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+
+ private PacketWrapper cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) {
+ Query query = result.getQuery();
+ PacketWrapper packetWrapper = searcher.getCacheControl().lookup(cacheKey, query);
+
+ if (packetWrapper == null) {
+ return null;
+ }
+ if (packetWrapper.getNumPackets() != 0) {
+ for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
+ Hit hit = i.next();
+
+ if (hit instanceof FastHit) {
+ FastHit fastHit = (FastHit) hit;
+ DocsumPacketKey key = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass);
+
+ if (searcher.fillHit(fastHit, (DocsumPacket) packetWrapper.getPacket(key), summaryClass).ok) {
+ fastHit.setCached(true);
+ }
+
+ }
+ }
+ result.hits().setSorted(false);
+ result.analyzeHits();
+ }
+
+ return packetWrapper;
+ }
+
+ private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) {
+ for (Iterator<Hit> i = hits.unorderedDeepIterator(); i.hasNext();) {
+ Hit h = i.next();
+ if (h instanceof FastHit) {
+ FastHit hit = (FastHit) h;
+ if (hit.isFilled(summaryClass)) {
+ continue;
+ }
+ if (hit.getCacheKey() != null) {
+ return hit.getCacheKey();
+ }
+ }
+ }
+ return null;
+ }
+
+ private int countFastHits(Result result) {
+ int count = 0;
+ for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
+ if (i.next() instanceof FastHit)
+ count++;
+ }
+ return count;
+ }
+
+ private Packet[] fetchSummaries(Result result, String summaryClass)
+ throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException {
+
+ BasicPacket[] receivedPackets;
+ boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery());
+ if (result.getQuery().getTraceLevel() >= 3)
+ result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3);
+
+ GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery);
+ int compressionLimit = result.getQuery().properties().getInteger(FastSearcher.PACKET_COMPRESSION_LIMIT, 0);
+ docsumsPacket.setCompressionLimit(compressionLimit);
+ if (compressionLimit != 0) {
+ docsumsPacket.setCompressionType(result.getQuery().properties().getString(FastSearcher.PACKET_COMPRESSION_TYPE, "lz4"));
+ }
+
+ boolean couldSend = channel.sendPacket(docsumsPacket);
+ if (!couldSend)
+ throw new IOException("Could not successfully send GetDocSumsPacket.");
+ receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1);
+
+ return convertBasicPackets(receivedPackets);
+ }
+
+ /**
+ * Returns an array of the hits contained in a result
+ *
+ * @param filled
+ * true to return all hits, false to return only unfilled hits
+ * @return array of docids, empty array if no hits
+ */
+ private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) {
+ DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()];
+ int x = 0;
+
+ for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
+ com.yahoo.search.result.Hit hit = i.next();
+ if (hit instanceof FastHit) {
+ FastHit fastHit = (FastHit) hit;
+ if (filled || !fastHit.isFilled(summaryClass)) {
+ packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass);
+ x++;
+ }
+ }
+ }
+ if (x < packetKeys.length) {
+ DocsumPacketKey[] tmp = new DocsumPacketKey[x];
+
+ System.arraycopy(packetKeys, 0, tmp, 0, x);
+ return tmp;
+ } else {
+ return packetKeys;
+ }
+ }
+
+ private static Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException {
+ // trying to cast a BasicPacket[] to Packet[] will compile,
+ // but lead to a runtime error. At least that's what I got
+ // from testing and reading the specification. I'm just happy
+ // if someone tells me what's the proper Java way of doing
+ // this. -SK
+ Packet[] packets = new Packet[basicPackets.length];
+
+ for (int i = 0; i < basicPackets.length; i++) {
+ packets[i] = (Packet) basicPackets[i];
+ }
+ return packets;
+ }
+
+ private String getName() {
+ return searcher.getName();
+ }
+
+ private Logger getLogger() {
+ return searcher.getLogger();
+ }
+
+ private boolean isLoggingFine() {
+ return getLogger().isLoggable(Level.FINE);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index 333d3970cc4..d34d119c1fe 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -1,18 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;
-import java.util.Optional;
-
import com.yahoo.compress.CompressionType;
import com.yahoo.container.search.LegacyEmulationConfig;
import com.yahoo.fs4.BasicPacket;
import com.yahoo.fs4.ChannelTimeoutException;
-import com.yahoo.fs4.GetDocSumsPacket;
-import com.yahoo.fs4.Packet;
import com.yahoo.fs4.PingPacket;
import com.yahoo.fs4.PongPacket;
import com.yahoo.fs4.QueryPacket;
-import com.yahoo.fs4.QueryResultPacket;
import com.yahoo.fs4.mplex.Backend;
import com.yahoo.fs4.mplex.FS4Channel;
import com.yahoo.fs4.mplex.InvalidChannelException;
@@ -29,13 +24,11 @@ import com.yahoo.search.grouping.GroupingRequest;
import com.yahoo.search.grouping.request.GroupingOperation;
import com.yahoo.search.query.Ranking;
import com.yahoo.search.result.ErrorMessage;
-import com.yahoo.search.result.Hit;
-import com.yahoo.search.result.HitGroup;
import com.yahoo.search.searchchain.Execution;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
-import java.util.Iterator;
+import java.util.Optional;
import java.util.logging.Level;
import static com.yahoo.container.util.Util.quote;
@@ -99,15 +92,6 @@ public class FastSearcher extends VespaBackEndSearcher {
this.dispatcher = dispatcher;
}
- private int countFastHits(Result result) {
- int count = 0;
- for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
- if (i.next() instanceof FastHit)
- count++;
- }
- return count;
- }
-
/**
* Pings the backend. Does not propagate to other searchers.
*/
@@ -153,7 +137,7 @@ public class FastSearcher extends VespaBackEndSearcher {
}
try {
- ensureInstanceOf(PongPacket.class, packets[0], name);
+ packets[0].ensureInstanceOf(PongPacket.class, name);
} catch (TimeoutException e) {
return new Pong(ErrorMessage.createTimeout(e.getMessage()));
} catch (IOException e) {
@@ -176,9 +160,7 @@ public class FastSearcher extends VespaBackEndSearcher {
if (dispatcher.searchCluster().groupSize() == 1)
forceSinglePassGrouping(query);
try(CloseableChannel channel = getChannel(query)) {
- channel.setQuery(query);
-
- Result result = searchTwoPhase(channel, query, queryPacket, cacheKey);
+ Result result = channel.search(query, queryPacket, cacheKey);
if (query.properties().getBoolean(Ranking.RANKFEATURES, false)) {
// There is currently no correct choice for which
@@ -214,7 +196,7 @@ public class FastSearcher extends VespaBackEndSearcher {
}
/**
- * Returns an interface object to issue a search request over.
+ * Returns a request interface object for the given query.
* Normally this is built from the backend field of this instance, which connects to the dispatch node
* this component talks to (which is why this instance was chosen by the cluster controller). However,
* under certain conditions we will instead return an interface which connects directly to the relevant
@@ -222,24 +204,24 @@ public class FastSearcher extends VespaBackEndSearcher {
*/
private CloseableChannel getChannel(Query query) {
if (query.properties().getBoolean(dispatchInternal, false)) {
- Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(query);
+ Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(this, query);
if (dispatchedChannel.isPresent()) {
return dispatchedChannel.get();
}
}
if (!query.properties().getBoolean(dispatchDirect, true))
- return new CloseableChannel(dispatchBackend);
+ return new FS4CloseableChannel(this, query, dispatchBackend);
if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE))
- return new CloseableChannel(dispatchBackend);
+ return new FS4CloseableChannel(this, query, dispatchBackend);
Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget();
if (!directDispatchRecipient.isPresent())
- return new CloseableChannel(dispatchBackend);
+ return new FS4CloseableChannel(this, query, dispatchBackend);
// Dispatch directly to the single, local search node
+ SearchCluster.Node local = directDispatchRecipient.get();
query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get());
- return new CloseableChannel(fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(),
- directDispatchRecipient.get().fs4port()), Optional.of(directDispatchRecipient.get().key()));
+ return new FS4CloseableChannel(this, query, fs4ResourcePool, local.hostname(), local.fs4port(), local.key());
}
/**
@@ -267,86 +249,8 @@ public class FastSearcher extends VespaBackEndSearcher {
return;
}
- CacheKey cacheKey = null;
- PacketWrapper packetWrapper = null;
- if (getCacheControl().useCache(query)) {
- cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass);
- if (cacheKey == null) {
- QueryPacket queryPacket = QueryPacket.create(query);
- cacheKey = new CacheKey(queryPacket);
- }
- packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass);
- }
-
- Packet[] receivedPackets;
- try(CloseableChannel channel = getChannel(query)) {
- channel.setQuery(query);
- DocsumPacketKey[] packetKeys;
-
- if (countFastHits(result) > 0) {
- packetKeys = getPacketKeys(result, summaryClass, false);
- if (packetKeys.length == 0) {
- receivedPackets = new Packet[0];
- } else {
- try {
- receivedPackets = fetchSummaries(channel, result, summaryClass);
- } catch (InvalidChannelException e) {
- result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
- return;
- } catch (ChannelTimeoutException e) {
- result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName()));
- return;
- } catch (IOException e) {
- result.hits().addError(ErrorMessage.createBackendCommunicationError(
- "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage()));
- return;
- }
- if (receivedPackets.length == 0) {
- result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)"));
- return;
- }
- }
- } else {
- packetKeys = new DocsumPacketKey[0];
- receivedPackets = new Packet[0];
- }
-
- int skippedHits;
- try {
- FillHitsResult fillHitsResult = fillHits(result, receivedPackets, summaryClass);
- skippedHits = fillHitsResult.skippedHits;
- if (fillHitsResult.error != null) {
- result.hits().addError(ErrorMessage.createTimeout(fillHitsResult.error));
- return;
- }
- } catch (TimeoutException e) {
- result.hits().addError(ErrorMessage.createTimeout(e.getMessage()));
- return;
- } catch (IOException e) {
- result.hits().addError(ErrorMessage.createBackendCommunicationError("Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage()));
- return;
- }
- if (skippedHits == 0 && packetWrapper != null) {
- cacheControl.updateCacheEntry(cacheKey, query, packetKeys, receivedPackets);
- }
-
- if ( skippedHits > 0 )
- result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit data for summary '" + summaryClass + "' for " + skippedHits + " hits"));
- result.analyzeHits();
-
- if (query.getTraceLevel() >= 3) {
- int hitNumber = 0;
- for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
- com.yahoo.search.result.Hit hit = i.next();
- if ( ! (hit instanceof FastHit)) continue;
- FastHit fastHit = (FastHit) hit;
-
- String traceMsg = "Hit: " + (hitNumber++) + " from " + (fastHit.isCached() ? "cache" : "backend" );
- if ( ! fastHit.isFilled(summaryClass))
- traceMsg += ". Error, hit, not filled";
- query.trace(traceMsg, false, 3);
- }
- }
+ try (CloseableChannel channel = getChannel(query)) {
+ channel.partialFill(result, summaryClass);
}
}
@@ -362,158 +266,10 @@ public class FastSearcher extends VespaBackEndSearcher {
return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass));
}
- private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) {
- for (Iterator<Hit> i = hits.unorderedDeepIterator(); i.hasNext();) {
- Hit h = i.next();
- if (h instanceof FastHit) {
- FastHit hit = (FastHit) h;
- if (hit.isFilled(summaryClass)) {
- continue;
- }
- if (hit.getCacheKey() != null) {
- return hit.getCacheKey();
- }
- }
- }
- return null;
- }
-
- private Result searchTwoPhase(CloseableChannel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
- if (isLoggingFine())
- getLogger().finest("sending query packet");
-
- try {
- boolean couldSend = channel.sendPacket(queryPacket);
- if ( ! couldSend)
- return new Result(query,ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'"));
- } catch (InvalidChannelException e) {
- return new Result(query,ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()));
- } catch (IllegalStateException e) {
- return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()));
- }
-
- BasicPacket[] basicPackets;
-
- try {
- basicPackets = channel.receivePackets(query.getTimeLeft(), 1);
- } catch (ChannelTimeoutException e) {
- return new Result(query,ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
- } catch (InvalidChannelException e) {
- return new Result(query,ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()));
- }
-
- if (basicPackets.length == 0) {
- return new Result(query,ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"));
- }
-
- if (isLoggingFine())
- getLogger().finest("got packets " + basicPackets.length + " packets");
-
- ensureInstanceOf(QueryResultPacket.class, basicPackets[0], getName());
- QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0];
-
- if (isLoggingFine())
- getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary());
-
- if (query.getPresentation().getSummary() == null)
- query.getPresentation().setSummary(getDefaultDocsumClass());
-
- Result result = new Result(query);
-
- addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result, false);
-
- addUnfilledHits(result, resultPacket.getDocuments(), false,
- queryPacket.getQueryPacketData(), cacheKey, channel.distributionKey());
- Packet[] packets;
- PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query);
-
- if (packetWrapper != null) {
- cacheControl.updateCacheEntry(cacheKey, query, resultPacket);
- }
- else {
- if (resultPacket.getCoverageFeature() && ! resultPacket.getCoverageFull()) {
- // Don't add error here, it was done in first phase
- // No check if packetWrapper already exists, since incomplete
- // first phase data won't be cached anyway.
- } else {
- packets = new Packet[1];
- packets[0] = resultPacket;
- cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, channel.distributionKey());
- }
- }
- return result;
- }
-
- private Packet[] convertBasicPackets(BasicPacket[] basicPackets) throws ClassCastException {
- // trying to cast a BasicPacket[] to Packet[] will compile,
- // but lead to a runtime error. At least that's what I got
- // from testing and reading the specification. I'm just happy
- // if someone tells me what's the proper Java way of doing
- // this. -SK
- Packet[] packets = new Packet[basicPackets.length];
-
- for (int i = 0; i < basicPackets.length; i++) {
- packets[i] = (Packet) basicPackets[i];
- }
- return packets;
- }
-
- private Packet[] fetchSummaries(CloseableChannel channel, Result result, String summaryClass)
- throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException {
-
- BasicPacket[] receivedPackets;
- boolean summaryNeedsQuery = summaryNeedsQuery(result.getQuery());
- if (result.getQuery().getTraceLevel() >=3)
- result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3);
-
- GetDocSumsPacket docsumsPacket = GetDocSumsPacket.create(result, summaryClass, summaryNeedsQuery);
- int compressionLimit = result.getQuery().properties().getInteger(PACKET_COMPRESSION_LIMIT, 0);
- docsumsPacket.setCompressionLimit(compressionLimit);
- if (compressionLimit != 0) {
- docsumsPacket.setCompressionType(result.getQuery().properties().getString(PACKET_COMPRESSION_TYPE, "lz4"));
- }
-
- boolean couldSend = channel.sendPacket(docsumsPacket);
- if ( ! couldSend) throw new IOException("Could not successfully send GetDocSumsPacket.");
- receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1);
-
- return convertBasicPackets(receivedPackets);
- }
-
public String toString() {
return "fast searcher (" + getName() + ") " + dispatchBackend;
}
- /**
- * Returns an array of the hits contained in this result
- *
- * @param filled true to return all hits, false to return only unfilled hits
- * @return array of docids, empty array if no hits
- */
- private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) {
- DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()];
- int x = 0;
-
- for (Iterator<com.yahoo.search.result.Hit> i = hitIterator(result); i.hasNext();) {
- com.yahoo.search.result.Hit hit = i.next();
- if (hit instanceof FastHit) {
- FastHit fastHit = (FastHit) hit;
- if(filled || !fastHit.isFilled(summaryClass)) {
- packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass);
- x++;
- }
- }
- }
- if (x < packetKeys.length) {
- DocsumPacketKey[] tmp = new DocsumPacketKey[x];
-
- System.arraycopy(packetKeys, 0, tmp, 0, x);
- return tmp;
- } else {
- return packetKeys;
- }
- }
-
protected boolean isLoggingFine() {
return getLogger().isLoggable(Level.FINE);
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
index 3e9a92ea0f7..a6f98418a76 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
@@ -1,15 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;
-import java.util.Optional;
import com.yahoo.collections.TinyIdentitySet;
-import com.yahoo.fs4.BasicPacket;
import com.yahoo.fs4.DocsumPacket;
import com.yahoo.fs4.DocumentInfo;
-import com.yahoo.fs4.ErrorPacket;
-import com.yahoo.fs4.QueryPacketData;
import com.yahoo.fs4.Packet;
import com.yahoo.fs4.QueryPacket;
+import com.yahoo.fs4.QueryPacketData;
import com.yahoo.fs4.QueryResultPacket;
import com.yahoo.io.GrowableByteBuffer;
import com.yahoo.io.HexDump;
@@ -42,7 +39,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.logging.Level;
+import java.util.logging.Logger;
/**
@@ -53,8 +52,8 @@ import java.util.logging.Level;
@SuppressWarnings("deprecation")
public abstract class VespaBackEndSearcher extends PingableSearcher {
- protected static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit");
- protected static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype");
+ static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit");
+ static final CompoundName PACKET_COMPRESSION_TYPE = new CompoundName("packetcompressiontype");
protected static final CompoundName TRACE_DISABLE = new CompoundName("trace.disable");
/** The set of all document databases available in the backend handled by this searcher */
@@ -65,7 +64,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
private String defaultDocsumClass = null;
/** Returns an iterator which returns all hits below this result **/
- protected Iterator<Hit> hitIterator(Result result) {
+ static Iterator<Hit> hitIterator(Result result) {
return result.hits().unorderedDeepIterator();
}
@@ -75,7 +74,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
/** Cache wrapper */
protected CacheControl cacheControl = null;
- protected final String getName() { return name; }
+ public final String getName() { return name; }
protected final String getDefaultDocsumClass() { return defaultDocsumClass; }
/** Sets default document summary class. Default is null */
@@ -84,6 +83,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
/** Returns the packet cache controller of this */
public final CacheControl getCacheControl() { return cacheControl; }
+ public final Logger getLogger() { return super.getLogger(); }
+
/**
* Searches a search cluster
* This is an endpoint - searchers will never propagate the search to any nested searcher.
@@ -101,7 +102,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
* Returns whether we need to send the query when fetching summaries.
* This is necessary if the query requests summary features or dynamic snippeting
*/
- protected boolean summaryNeedsQuery(Query query) {
+ boolean summaryNeedsQuery(Query query) {
if (query.getRanking().getQueryCache()) return false; // Query is cached in backend
DocumentDatabase documentDb = getDocumentDatabase(query);
@@ -135,7 +136,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
Result result = new Result(query);
QueryResultPacket resultPacket = packetWrapper.getFirstResultPacket();
- addMetaInfo(query, queryPacketData, resultPacket, result, true);
+ addMetaInfo(query, queryPacketData, resultPacket, result);
if (packetWrapper.getNumPackets() == 0)
addUnfilledHits(result, documents, true, queryPacketData, key, packetWrapper.distributionKey());
else
@@ -400,7 +401,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
}
}
- protected void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result, boolean fromCache) {
+ void addMetaInfo(Query query, QueryPacketData queryPacketData, QueryResultPacket resultPacket, Result result) {
result.setTotalHitCount(resultPacket.getTotalDocumentCount());
// Grouping
@@ -429,7 +430,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
}
}
- static private class FillHitResult {
+ static class FillHitResult {
final boolean ok;
final String error;
FillHitResult(boolean ok) {
@@ -440,7 +441,8 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
this.error = error;
}
}
- private FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) {
+
+ FillHitResult fillHit(FastHit hit, DocsumPacket packet, String summaryClass) {
if (packet != null) {
byte[] docsumdata = packet.getData();
if (docsumdata.length > 0) {
@@ -464,7 +466,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
* @return the number of hits that we did not return data for, and an optional error message.
* when things are working normally we return 0.
*/
- protected FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException {
+ public FillHitsResult fillHits(Result result, Packet[] packets, String summaryClass) throws IOException {
int skippedHits = 0;
String lastError = null;
int packetIndex = 0;
@@ -474,7 +476,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
if (hit instanceof FastHit && ! hit.isFilled(summaryClass)) {
FastHit fastHit = (FastHit) hit;
- ensureInstanceOf(DocsumPacket.class, packets[packetIndex], getName());
+ packets[packetIndex].ensureInstanceOf(DocsumPacket.class, getName());
DocsumPacket docsum = (DocsumPacket) packets[packetIndex];
packetIndex++;
@@ -493,23 +495,6 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
return new FillHitsResult(skippedHits, lastError);
}
- /**
- * Throws an IOException if the packet is not of the expected type
- */
- protected static void ensureInstanceOf(Class<? extends BasicPacket> type, BasicPacket packet, String name) throws IOException {
- if ((type.isAssignableFrom(packet.getClass()))) return;
-
- if (packet instanceof ErrorPacket) {
- ErrorPacket errorPacket=(ErrorPacket)packet;
- if (errorPacket.getErrorCode() == 8)
- throw new TimeoutException("Query timed out in " + name);
- else
- throw new IOException("Received error from backend in " + name + ": " + packet);
- } else {
- throw new IOException("Received " + packet + " when expecting " + type);
- }
- }
-
private boolean addCachedHits(Result result,
PacketWrapper packetWrapper,
String summaryClass,
@@ -562,34 +547,6 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
hit.setPartId(document.getPartId());
}
- protected PacketWrapper cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) {
- Query query = result.getQuery();
- PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query);
-
- if (packetWrapper == null) {
- return null;
- }
- if (packetWrapper.getNumPackets() != 0) {
- for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
- Hit hit = i.next();
-
- if (hit instanceof FastHit) {
- FastHit fastHit = (FastHit) hit;
- DocsumPacketKey key = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass);
-
- if (fillHit(fastHit, (DocsumPacket) packetWrapper.getPacket(key), summaryClass).ok) {
- fastHit.setCached(true);
- }
-
- }
- }
- result.hits().setSorted(false);
- result.analyzeHits();
- }
-
- return packetWrapper;
- }
-
protected DocsumDefinitionSet getDocsumDefinitionSet(Query query) {
DocumentDatabase db = getDocumentDatabase(query);
return db.getDocsumDefinitionSet();
@@ -620,13 +577,12 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
* Only set if produced directly by a search node, not dispatch
* (in which case it is not set in the received packets.)
*/
- boolean addUnfilledHits(Result result,
+ void addUnfilledHits(Result result,
List<DocumentInfo> documents,
boolean fromCache,
QueryPacketData queryPacketData,
CacheKey cacheKey,
Optional<Integer> channelDistributionKey) {
- boolean allHitsOK = true;
Query myQuery = result.getQuery();
for (DocumentInfo document : documents) {
@@ -646,14 +602,11 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
result.hits().add(hit);
} catch (ConfigurationException e) {
- allHitsOK = false;
getLogger().log(LogLevel.WARNING, "Skipping hit", e);
} catch (Exception e) {
- allHitsOK = false;
getLogger().log(LogLevel.ERROR, "Skipping malformed hit", e);
}
}
- return allHitsOK;
}
@SuppressWarnings("rawtypes")
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
index 643b8f81318..3f5ebe53d0d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
@@ -1,54 +1,41 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.yahoo.fs4.BasicPacket;
-import com.yahoo.fs4.ChannelTimeoutException;
-import com.yahoo.fs4.mplex.Backend;
-import com.yahoo.fs4.mplex.FS4Channel;
-import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.prelude.fastsearch.CacheKey;
import com.yahoo.search.Query;
+import com.yahoo.search.Result;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Optional;
/**
+ * CloseableChannel is an interface for running a search query and getting document summaries against some
+ * content node, node group or dispatcher while abstracting the specifics of the invocation target.
+ *
* @author ollivir
*/
-public class CloseableChannel implements Closeable {
- private FS4Channel channel;
- private final Optional<Integer> distributionKey;
+public abstract class CloseableChannel implements Closeable {
+ /** Retrieve the hits for the given {@link Query} */
+ public abstract Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException;
- public CloseableChannel(Backend backend) {
- this(backend, Optional.empty());
- }
-
- public CloseableChannel(Backend backend, Optional<Integer> distributionKey) {
- this.channel = backend.openChannel();
- this.distributionKey = distributionKey;
- }
+ /** Retrieve document summaries for the unfilled hits in the given {@link Result} */
+ public abstract void partialFill(Result result, String summaryClass);
- public void setQuery(Query query) {
- channel.setQuery(query);
- }
+ protected abstract void closeChannel();
- public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException {
- return channel.sendPacket(packet);
- }
-
- public BasicPacket[] receivePackets(long timeout, int packetCount) throws InvalidChannelException, ChannelTimeoutException {
- return channel.receivePackets(timeout, packetCount);
- }
+ private Runnable teardown = null;
- public Optional<Integer> distributionKey() {
- return distributionKey;
+ public void teardown(Runnable teardown) {
+ this.teardown = teardown;
}
@Override
- public void close() {
- if (channel != null) {
- channel.close();
- channel = null;
+ public final void close() {
+ if (teardown != null) {
+ teardown.run();
+ teardown = null;
}
+ closeChannel();
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java
deleted file mode 100644
index d005d9491d5..00000000000
--- a/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
-
-import com.yahoo.prelude.fastsearch.FS4ResourcePool;
-import com.yahoo.search.dispatch.SearchCluster.Group;
-import com.yahoo.search.dispatch.SearchCluster.Node;
-
-import java.util.Optional;
-
-/**
- * An extension to CloseableChannel that encapsulates the release of a LoadBalancer group allocation.
- *
- * @author ollivir
- */
-public class DispatchedChannel extends CloseableChannel {
- private final SearchCluster.Group group;
- private final LoadBalancer loadBalancer;
- private boolean groupAllocated = true;
-
- public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group, Node node) {
- super(fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), Optional.of(node.key()));
-
- this.loadBalancer = loadBalancer;
- this.group = group;
- }
-
- public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group) {
- this(fs4ResourcePool, loadBalancer, group, group.nodes().iterator().next());
- }
-
- public void close() {
- if (groupAllocated) {
- groupAllocated = false;
- loadBalancer.releaseGroup(group);
- }
- super.close();
- }
-}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index c383b681558..0cf18852dd3 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -8,18 +8,20 @@ import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.container.protect.Error;
-import com.yahoo.prelude.fastsearch.DocumentDatabase;
-import com.yahoo.slime.ArrayTraverser;
+import com.yahoo.data.access.Inspector;
import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.prelude.fastsearch.DocumentDatabase;
+import com.yahoo.prelude.fastsearch.FS4CloseableChannel;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.query.SessionId;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
-import com.yahoo.data.access.Inspector;
+import com.yahoo.slime.ArrayTraverser;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.Cursor;
import com.yahoo.slime.Slime;
@@ -52,7 +54,7 @@ public class Dispatcher extends AbstractComponent {
/** A model of the search cluster this dispatches to */
private final SearchCluster searchCluster;
-
+
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections;
@@ -84,7 +86,7 @@ public class Dispatcher extends AbstractComponent {
this.fs4ResourcePool = null;
this.loadBalancer = new LoadBalancer(searchCluster);
}
-
+
/** Returns the search cluster this dispatches to */
public SearchCluster searchCluster() { return searchCluster; }
@@ -283,14 +285,18 @@ public class Dispatcher extends AbstractComponent {
}
- public Optional<CloseableChannel> getDispatchedChannel(Query query) {
+ public Optional<CloseableChannel> getDispatchedChannel(VespaBackEndSearcher searcher, Query query) {
Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query);
return groupInCluster.flatMap(group -> {
if(group.nodes().size() == 1) {
SearchCluster.Node node = group.nodes().iterator().next();
query.trace(false, 2, "Dispatching internally to ", group, " (", node.toString(), ")");
- return Optional.of(new DispatchedChannel(fs4ResourcePool, loadBalancer, group));
+ CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key());
+ channel.teardown(() -> {
+ loadBalancer.releaseGroup(group);
+ });
+ return Optional.of(channel);
} else {
loadBalancer.releaseGroup(group);
return Optional.empty();