aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2018-09-24 14:45:25 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2018-09-24 14:45:25 +0200
commit6002813ae4d7e012409f6d43abe5a83bddc63f06 (patch)
treef7911b8a516172b55ef45692c8f35451cb91e85b /container-search
parent3a18bab028097dec03bac8186d06c23536fda1c0 (diff)
Multiple node java dispatch support
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java137
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java44
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java17
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java28
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java39
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java98
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java5
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java15
8 files changed, 288 insertions, 95 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
index dc95f83365e..10a640c54c8 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
@@ -21,15 +21,17 @@ import com.yahoo.search.result.HitGroup;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator;
+import static java.util.Arrays.asList;
/**
* {@link CloseableChannel} implementation for FS4 nodes and fdispatch
- *
+ *
* @author ollivir
*/
public class FS4CloseableChannel extends CloseableChannel {
@@ -37,6 +39,14 @@ public class FS4CloseableChannel extends CloseableChannel {
private FS4Channel channel;
private final Optional<Integer> distributionKey;
+ private ErrorMessage pendingSearchError = null;
+ private Query query = null;
+ private QueryPacket queryPacket = null;
+
+ private int expectedFillResults = 0;
+ private CacheKey summaryCacheKey = null;
+ private DocsumPacketKey[] summaryPacketKeys = null;
+
public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port,
int distributionKey) {
this.searcher = searcher;
@@ -56,32 +66,47 @@ public class FS4CloseableChannel extends CloseableChannel {
}
@Override
- public Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
+ protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
if (isLoggingFine())
getLogger().finest("sending query packet");
+ if(queryPacket == null) {
+ // query changed for subchannel
+ queryPacket = searcher.createQueryPacket(query);
+ }
+
+ this.query = query;
+ this.queryPacket = queryPacket;
+
try {
boolean couldSend = channel.sendPacket(queryPacket);
- if (!couldSend)
- return new Result(query, ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'"));
+ if (!couldSend) {
+ pendingSearchError = ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'");
+ }
} catch (InvalidChannelException e) {
- return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()));
+ pendingSearchError = ErrorMessage.createBackendCommunicationError("Invalid channel " + getName());
} catch (IllegalStateException e) {
- return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()));
+ pendingSearchError = ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage());
}
+ }
+ @Override
+ protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException {
+ if(pendingSearchError != null) {
+ return asList(new Result(query, pendingSearchError));
+ }
BasicPacket[] basicPackets;
try {
basicPackets = channel.receivePackets(query.getTimeLeft(), 1);
} catch (ChannelTimeoutException e) {
- return new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
+ return asList(new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())));
} catch (InvalidChannelException e) {
- return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()));
+ return asList(new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())));
}
if (basicPackets.length == 0) {
- return new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"));
+ return asList(new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")));
}
if (isLoggingFine())
@@ -118,53 +143,67 @@ public class FS4CloseableChannel extends CloseableChannel {
cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey);
}
}
- return result;
+ return asList(result);
}
@Override
- public void partialFill(Result result, String summaryClass) {
- Packet[] receivedPackets;
- DocsumPacketKey[] packetKeys;
-
- CacheKey cacheKey = null;
- PacketWrapper packetWrapper = null;
+ protected void sendPartialFillRequest(Result result, String summaryClass) {
+ summaryCacheKey = null;
if (searcher.getCacheControl().useCache(channel.getQuery())) {
- cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass);
- if (cacheKey == null) {
+ summaryCacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass);
+ if (summaryCacheKey == null) {
QueryPacket queryPacket = QueryPacket.create(channel.getQuery());
- cacheKey = new CacheKey(queryPacket);
+ summaryCacheKey = new CacheKey(queryPacket);
+ }
+ boolean cacheFound = cacheLookupTwoPhase(summaryCacheKey, result, summaryClass);
+ if (!cacheFound) {
+ summaryCacheKey = null;
}
- packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass);
}
if (countFastHits(result) > 0) {
- packetKeys = getPacketKeys(result, summaryClass, false);
- if (packetKeys.length == 0) {
- receivedPackets = new Packet[0];
+ summaryPacketKeys = getPacketKeys(result, summaryClass);
+ if (summaryPacketKeys.length == 0) {
+ expectedFillResults = 0;
} else {
try {
- receivedPackets = fetchSummaries(result, summaryClass);
+ expectedFillResults = requestSummaries(result, summaryClass);
} catch (InvalidChannelException e) {
result.hits()
.addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
return;
- } catch (ChannelTimeoutException e) {
- result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName()));
- return;
} catch (IOException e) {
result.hits().addError(ErrorMessage.createBackendCommunicationError(
"IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage()));
return;
}
- if (receivedPackets.length == 0) {
- result.hits()
- .addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)"));
- return;
- }
}
} else {
- packetKeys = new DocsumPacketKey[0];
- receivedPackets = new Packet[0];
+ expectedFillResults = 0;
+ }
+ }
+
+
+ @Override
+ protected void getPartialFillResults(Result result, String summaryClass) {
+ if (expectedFillResults == 0) {
+ return;
+ }
+
+ Packet[] receivedPackets;
+ try {
+ receivedPackets = getSummaryResponses(result);
+ } catch (InvalidChannelException e1) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)"));
+ return;
+ } catch (ChannelTimeoutException e1) {
+ result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName()));
+ return;
+ }
+
+ if (receivedPackets.length == 0) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)"));
+ return;
}
int skippedHits;
@@ -183,8 +222,8 @@ public class FS4CloseableChannel extends CloseableChannel {
"Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage()));
return;
}
- if (skippedHits == 0 && packetWrapper != null) {
- searcher.getCacheControl().updateCacheEntry(cacheKey, channel.getQuery(), packetKeys, receivedPackets);
+ if (skippedHits == 0 && summaryCacheKey != null) {
+ searcher.getCacheControl().updateCacheEntry(summaryCacheKey, channel.getQuery(), summaryPacketKeys, receivedPackets);
}
if (skippedHits > 0)
@@ -216,12 +255,12 @@ public class FS4CloseableChannel extends CloseableChannel {
}
}
- private PacketWrapper cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) {
+ private boolean cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) {
Query query = result.getQuery();
PacketWrapper packetWrapper = searcher.getCacheControl().lookup(cacheKey, query);
if (packetWrapper == null) {
- return null;
+ return false;
}
if (packetWrapper.getNumPackets() != 0) {
for (Iterator<Hit> i = hitIterator(result); i.hasNext();) {
@@ -241,7 +280,7 @@ public class FS4CloseableChannel extends CloseableChannel {
result.analyzeHits();
}
- return packetWrapper;
+ return true;
}
private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) {
@@ -269,10 +308,8 @@ public class FS4CloseableChannel extends CloseableChannel {
return count;
}
- private Packet[] fetchSummaries(Result result, String summaryClass)
- throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException {
+ private int requestSummaries(Result result, String summaryClass) throws InvalidChannelException, ClassCastException, IOException {
- BasicPacket[] receivedPackets;
boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery());
if (result.getQuery().getTraceLevel() >= 3)
result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3);
@@ -287,7 +324,15 @@ public class FS4CloseableChannel extends CloseableChannel {
boolean couldSend = channel.sendPacket(docsumsPacket);
if (!couldSend)
throw new IOException("Could not successfully send GetDocSumsPacket.");
- receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1);
+
+ return docsumsPacket.getNumDocsums() + 1;
+ }
+
+ private Packet[] getSummaryResponses(Result result) throws InvalidChannelException, ChannelTimeoutException {
+ if(expectedFillResults == 0) {
+ return new Packet[0];
+ }
+ BasicPacket[] receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), expectedFillResults);
return convertBasicPackets(receivedPackets);
}
@@ -295,11 +340,9 @@ public class FS4CloseableChannel extends CloseableChannel {
/**
* Returns an array of the hits contained in a result
*
- * @param filled
- * true to return all hits, false to return only unfilled hits
* @return array of docids, empty array if no hits
*/
- private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) {
+ private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass) {
DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()];
int x = 0;
@@ -307,7 +350,7 @@ public class FS4CloseableChannel extends CloseableChannel {
com.yahoo.search.result.Hit hit = i.next();
if (hit instanceof FastHit) {
FastHit fastHit = (FastHit) hit;
- if (filled || !fastHit.isFilled(summaryClass)) {
+ if (!fastHit.isFilled(summaryClass)) {
packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass);
x++;
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index d34d119c1fe..9acf48a7c67 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -28,6 +28,7 @@ import com.yahoo.search.searchchain.Execution;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
+import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
@@ -64,9 +65,9 @@ public class FastSearcher extends VespaBackEndSearcher {
private final Dispatcher dispatcher;
private final Backend dispatchBackend;
-
+
private final FS4ResourcePool fs4ResourcePool;
-
+
/**
* Creates a Fastsearcher.
*
@@ -99,7 +100,7 @@ public class FastSearcher extends VespaBackEndSearcher {
public Pong ping(Ping ping, Execution execution) {
return ping(ping, dispatchBackend, getName());
}
-
+
public static Pong ping(Ping ping, Backend backend, String name) {
FS4Channel channel = backend.openPingChannel();
@@ -151,6 +152,7 @@ public class FastSearcher extends VespaBackEndSearcher {
}
}
+ @Override
protected void transformQuery(Query query) {
QueryRewrite.rewriteSddocname(query);
}
@@ -160,7 +162,8 @@ public class FastSearcher extends VespaBackEndSearcher {
if (dispatcher.searchCluster().groupSize() == 1)
forceSinglePassGrouping(query);
try(CloseableChannel channel = getChannel(query)) {
- Result result = channel.search(query, queryPacket, cacheKey);
+ List<Result> results = channel.search(query, queryPacket, cacheKey);
+ Result result = mergeResults(results, query, execution);
if (query.properties().getBoolean(Ranking.RANKFEATURES, false)) {
// There is currently no correct choice for which
@@ -182,13 +185,13 @@ public class FastSearcher extends VespaBackEndSearcher {
return result;
}
}
-
+
/** When we only search a single node, doing all grouping in one pass is more efficient */
private void forceSinglePassGrouping(Query query) {
for (GroupingRequest groupingRequest : query.getSelect().getGrouping())
forceSinglePassGrouping(groupingRequest.getRootOperation());
}
-
+
private void forceSinglePassGrouping(GroupingOperation operation) {
operation.setForceSinglePass(true);
for (GroupingOperation childOperation : operation.getChildren())
@@ -231,6 +234,7 @@ public class FastSearcher extends VespaBackEndSearcher {
* @param result result containing a partition of the unfilled hits
* @param summaryClass the summary class we want to fill with
**/
+ @Override
protected void doPartialFill(Result result, String summaryClass) {
if (result.isFilled(summaryClass)) return;
@@ -262,6 +266,34 @@ public class FastSearcher extends VespaBackEndSearcher {
return false;
}
+ private Result mergeResults(List<Result> results, Query query, Execution execution) {
+ if(results.size() == 1) {
+ return results.get(0);
+ }
+
+ Result result = new Result(query);
+
+ for (Result partialResult : results) {
+ result.mergeWith(partialResult);
+ result.hits().addAll(partialResult.hits().asUnorderedHits());
+ }
+
+ if (query.getOffset() != 0 || result.hits().size() > query.getHits()) {
+ // with multiple results, each partial result is expected to have
+ // offset = 0 to allow correct offset positioning after merge
+
+ if (result.getHitOrderer() != null) {
+ // Make sure we have the necessary data for sorting
+ fill(result, Execution.ATTRIBUTEPREFETCH, execution);
+ }
+ result.hits().trim(query.getOffset(), query.getHits());
+ }
+
+ // TODO grouping
+
+ return result;
+ }
+
private static @NonNull Optional<String> quotedSummaryClass(String summaryClass) {
return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass));
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
index a6f98418a76..409d05e3aaf 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
@@ -213,11 +213,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
if (root == null || root instanceof NullItem) // root can become null after resolving and transformation?
return new Result(query);
- QueryPacket queryPacket = QueryPacket.create(query);
- int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0);
- queryPacket.setCompressionLimit(compressionLimit);
- if (compressionLimit != 0)
- queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4"));
+ QueryPacket queryPacket = createQueryPacket(query);
if (isLoggingFine())
getLogger().fine("made QueryPacket: " + queryPacket);
@@ -241,6 +237,15 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
return result;
}
+ protected QueryPacket createQueryPacket(Query query) {
+ QueryPacket queryPacket = QueryPacket.create(query);
+ int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0);
+ queryPacket.setCompressionLimit(compressionLimit);
+ if (compressionLimit != 0)
+ queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4"));
+ return queryPacket;
+ }
+
/**
* Returns a cached result, or null if no result was cached for this key
*
@@ -355,7 +360,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
s.append(" location=")
.append(query.getRanking().getLocation().toString());
}
-
+
if (query.getGroupingSessionCache()) {
s.append(" groupingSessionCache=true");
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
index 3f5ebe53d0d..fc337d589ec 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
@@ -8,19 +8,37 @@ import com.yahoo.search.Result;
import java.io.Closeable;
import java.io.IOException;
+import java.util.List;
/**
- * CloseableChannel is an interface for running a search query and getting document summaries against some
- * content node, node group or dispatcher while abstracting the specifics of the invocation target.
+ * CloseableChannel is an interface for running a search query and getting document summaries against some content node, node group or
+ * dispatcher while abstracting the specifics of the invocation target. ClosebleChannel objects are stateful and should not be reused.
*
* @author ollivir
*/
public abstract class CloseableChannel implements Closeable {
- /** Retrieve the hits for the given {@link Query} */
- public abstract Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException;
+ /** Retrieve the hits for the given {@link Query}. The channel may return more than one result, in
+ * which case the caller is responsible for merging the results. If multiple results are returned
+ * and the search query had a hit offset other than zero, that offset will be set to zero and the
+ * number of requested hits will be adjusted accordingly. */
+ public List<Result> search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
+ sendSearchRequest(query, queryPacket);
+ return getSearchResults(cacheKey);
+ }
+
+ protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException;
+
+ protected abstract List<Result> getSearchResults(CacheKey cacheKey) throws IOException;
/** Retrieve document summaries for the unfilled hits in the given {@link Result} */
- public abstract void partialFill(Result result, String summaryClass);
+ public void partialFill(Result result, String summaryClass) {
+ sendPartialFillRequest(result, summaryClass);
+ getPartialFillResults(result, summaryClass);
+ }
+
+ protected abstract void getPartialFillResults(Result result, String summaryClass);
+
+ protected abstract void sendPartialFillRequest(Result result, String summaryClass);
protected abstract void closeChannel();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 0cf18852dd3..ce0d48f5638 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -27,6 +27,7 @@ import com.yahoo.slime.Cursor;
import com.yahoo.slime.Slime;
import com.yahoo.vespa.config.search.DispatchConfig;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -286,21 +287,31 @@ public class Dispatcher extends AbstractComponent {
}
public Optional<CloseableChannel> getDispatchedChannel(VespaBackEndSearcher searcher, Query query) {
- Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query);
+ if (!query.getSelect().getGrouping().isEmpty()) {
+ return Optional.empty();
+ }
- return groupInCluster.flatMap(group -> {
- if(group.nodes().size() == 1) {
- SearchCluster.Node node = group.nodes().iterator().next();
- query.trace(false, 2, "Dispatching internally to ", group, " (", node.toString(), ")");
- CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key());
- channel.teardown(() -> {
- loadBalancer.releaseGroup(group);
- });
- return Optional.of(channel);
- } else {
- loadBalancer.releaseGroup(group);
- return Optional.empty();
+ Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query);
+ if (!groupInCluster.isPresent()) {
+ return Optional.empty();
+ }
+ SearchCluster.Group group = groupInCluster.get();
+ query.trace(false, 2, "Dispatching internally to ", group);
+
+ if (group.nodes().size() == 1) {
+ SearchCluster.Node node = group.nodes().iterator().next();
+ CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(),
+ node.key());
+ return Optional.of(channel);
+ } else {
+ query.setNoCache(true); // Note - multi-node request disables packet based caching
+
+ Map<Integer, CloseableChannel> subchannels = new HashMap<>();
+ for (SearchCluster.Node node : group.nodes()) {
+ subchannels.put(node.key(), new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()));
}
- });
+ CloseableChannel multinode = new InterleavedCloseableChannel(subchannels);
+ return Optional.of(multinode);
+ }
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java
new file mode 100644
index 00000000000..e461f6fc725
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java
@@ -0,0 +1,98 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.prelude.fastsearch.CacheKey;
+import com.yahoo.prelude.fastsearch.FastHit;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.result.Hit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * InterleavedCloseableChannel uses multiple {@link CloseableChannel} objects to interface with
+ * content nodes in parallel. Operationally it first sends requests to all channels and then
+ * collects the results. The invoker of this class is responsible for merging the results if
+ * needed.
+ *
+ * @author ollivir
+ */
+public class InterleavedCloseableChannel extends CloseableChannel {
+ private final Map<Integer, CloseableChannel> subchannels;
+ private Map<Integer, Result> expectedFillResults = null;
+
+ public InterleavedCloseableChannel(Map<Integer, CloseableChannel> subchannels) {
+ this.subchannels = subchannels;
+ }
+
+ /** Sends search queries to the contained {@link CloseableChannel} subchannels. If the
+ * search query has an offset other than zero, it will be reset to zero and the expected
+ * hit amount will be adjusted accordingly. */
+ @Override
+ protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
+ for (CloseableChannel subchannel : subchannels.values()) {
+ Query subquery = query.clone();
+
+ subquery.setHits(subquery.getHits() + subquery.getOffset());
+ subquery.setOffset(0);
+ subchannel.sendSearchRequest(subquery, null);
+ }
+ }
+
+ @Override
+ protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException {
+ List<Result> results = new ArrayList<>();
+
+ for (CloseableChannel subchannel : subchannels.values()) {
+ results.addAll(subchannel.getSearchResults(cacheKey));
+ }
+ return results;
+ }
+
+ @Override
+ protected void sendPartialFillRequest(Result result, String summaryClass) {
+ expectedFillResults = new HashMap<>();
+
+ for (Iterator<Hit> it = result.hits().deepIterator(); it.hasNext();) {
+ Hit hit = it.next();
+ if (hit instanceof FastHit) {
+ FastHit fhit = (FastHit) hit;
+ Result res = expectedFillResults.computeIfAbsent(fhit.getDistributionKey(), dk -> new Result(result.getQuery()));
+ res.hits().add(fhit);
+ }
+ }
+ expectedFillResults.forEach((distKey, partialResult) -> {
+ CloseableChannel channel = subchannels.get(distKey);
+ if (channel != null) {
+ channel.sendPartialFillRequest(partialResult, summaryClass);
+ }
+ });
+ }
+
+ @Override
+ protected void getPartialFillResults(Result result, String summaryClass) {
+ if (expectedFillResults == null) {
+ return;
+ }
+ expectedFillResults.forEach((distKey, partialResult) -> {
+ CloseableChannel channel = subchannels.get(distKey);
+ if (channel != null) {
+ channel.getPartialFillResults(partialResult, summaryClass);
+ }
+ });
+ }
+
+ @Override
+ protected void closeChannel() {
+ if (!subchannels.isEmpty()) {
+ subchannels.values().forEach(CloseableChannel::close);
+ subchannels.clear();
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
index 269d16fd24d..455696c16b1 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -26,17 +26,14 @@ public class LoadBalancer {
private static final CompoundName QUERY_NODE_GROUP_AFFINITY = new CompoundName("dispatch.group.affinity");
- private final boolean isInternallyDispatchable;
private final List<GroupSchedule> scoreboard;
private int needle = 0;
public LoadBalancer(SearchCluster searchCluster) {
if (searchCluster == null) {
- this.isInternallyDispatchable = false;
this.scoreboard = null;
return;
}
- this.isInternallyDispatchable = (searchCluster.groupSize() == 1);
this.scoreboard = new ArrayList<>(searchCluster.groups().size());
for (Group group : searchCluster.groups().values()) {
@@ -53,7 +50,7 @@ public class LoadBalancer {
* @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used
*/
public Optional<Group> takeGroupForQuery(Query query) {
- if (!isInternallyDispatchable) {
+ if (scoreboard == null) {
return Optional.empty();
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
index e94c11e4473..5fa9dee8370 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
@@ -47,18 +47,7 @@ public class LoadBalancerTest {
}
@Test
- public void requreThatLoadBalancerIgnoresClusteredSingleGroup() {
- Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
- Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0);
- SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 2, null);
- LoadBalancer lb = new LoadBalancer(cluster);
-
- Optional<Group> grp = lb.takeGroupForQuery(new Query());
- assertThat(grp.isPresent(), is(false));
- }
-
- @Test
- public void requreThatLoadBalancerIgnoresClusteredGroups() {
+ public void requreThatLoadBalancerServesClusteredGroups() {
Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0);
Node n3 = new SearchCluster.Node(0, "test-node3", 0, 1);
@@ -67,7 +56,7 @@ public class LoadBalancerTest {
LoadBalancer lb = new LoadBalancer(cluster);
Optional<Group> grp = lb.takeGroupForQuery(new Query());
- assertThat(grp.isPresent(), is(false));
+ assertThat(grp.isPresent(), is(true));
}
@Test