diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-09-24 14:45:25 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-09-24 14:45:25 +0200 |
commit | 6002813ae4d7e012409f6d43abe5a83bddc63f06 (patch) | |
tree | f7911b8a516172b55ef45692c8f35451cb91e85b /container-search | |
parent | 3a18bab028097dec03bac8186d06c23536fda1c0 (diff) |
Multiple node java dispatch support
Diffstat (limited to 'container-search')
8 files changed, 288 insertions, 95 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java index dc95f83365e..10a640c54c8 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java @@ -21,15 +21,17 @@ import com.yahoo.search.result.HitGroup; import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator; +import static java.util.Arrays.asList; /** * {@link CloseableChannel} implementation for FS4 nodes and fdispatch - * + * * @author ollivir */ public class FS4CloseableChannel extends CloseableChannel { @@ -37,6 +39,14 @@ public class FS4CloseableChannel extends CloseableChannel { private FS4Channel channel; private final Optional<Integer> distributionKey; + private ErrorMessage pendingSearchError = null; + private Query query = null; + private QueryPacket queryPacket = null; + + private int expectedFillResults = 0; + private CacheKey summaryCacheKey = null; + private DocsumPacketKey[] summaryPacketKeys = null; + public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port, int distributionKey) { this.searcher = searcher; @@ -56,32 +66,47 @@ public class FS4CloseableChannel extends CloseableChannel { } @Override - public Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { + protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { if (isLoggingFine()) getLogger().finest("sending query packet"); + if(queryPacket == null) { + // query changed for subchannel + queryPacket = searcher.createQueryPacket(query); + } + + this.query = query; + this.queryPacket = queryPacket; + try { boolean couldSend = channel.sendPacket(queryPacket); - if (!couldSend) - return new Result(query, ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'")); + if (!couldSend) { + pendingSearchError = ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'"); + } } catch (InvalidChannelException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel " + getName())); + pendingSearchError = ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()); } catch (IllegalStateException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage())); + pendingSearchError = ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()); } + } + @Override + protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { + if(pendingSearchError != null) { + return asList(new Result(query, pendingSearchError)); + } BasicPacket[] basicPackets; try { basicPackets = channel.receivePackets(query.getTimeLeft(), 1); } catch (ChannelTimeoutException e) { - return new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); + return asList(new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()))); } catch (InvalidChannelException e) { - return new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())); + return asList(new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()))); } if (basicPackets.length == 0) { - return new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")); + return asList(new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"))); } if (isLoggingFine()) @@ -118,53 +143,67 @@ public class FS4CloseableChannel extends CloseableChannel { cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey); } } - return result; + return asList(result); } @Override - public void partialFill(Result result, String summaryClass) { - Packet[] receivedPackets; - DocsumPacketKey[] packetKeys; - - CacheKey cacheKey = null; - PacketWrapper packetWrapper = null; + protected void sendPartialFillRequest(Result result, String summaryClass) { + summaryCacheKey = null; if (searcher.getCacheControl().useCache(channel.getQuery())) { - cacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass); - if (cacheKey == null) { + summaryCacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass); + if (summaryCacheKey == null) { QueryPacket queryPacket = QueryPacket.create(channel.getQuery()); - cacheKey = new CacheKey(queryPacket); + summaryCacheKey = new CacheKey(queryPacket); + } + boolean cacheFound = cacheLookupTwoPhase(summaryCacheKey, result, summaryClass); + if (!cacheFound) { + summaryCacheKey = null; } - packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass); } if (countFastHits(result) > 0) { - packetKeys = getPacketKeys(result, summaryClass, false); - if (packetKeys.length == 0) { - receivedPackets = new Packet[0]; + summaryPacketKeys = getPacketKeys(result, summaryClass); + if (summaryPacketKeys.length == 0) { + expectedFillResults = 0; } else { try { - receivedPackets = fetchSummaries(result, summaryClass); + expectedFillResults = requestSummaries(result, summaryClass); } catch (InvalidChannelException e) { result.hits() .addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); return; - } catch (ChannelTimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); - return; } catch (IOException e) { result.hits().addError(ErrorMessage.createBackendCommunicationError( "IO error while talking on channel " + getName() + " (summary fetch): " + e.getMessage())); return; } - if (receivedPackets.length == 0) { - result.hits() - .addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); - return; - } } } else { - packetKeys = new DocsumPacketKey[0]; - receivedPackets = new Packet[0]; + expectedFillResults = 0; + } + } + + + @Override + protected void getPartialFillResults(Result result, String summaryClass) { + if (expectedFillResults == 0) { + return; + } + + Packet[] receivedPackets; + try { + receivedPackets = getSummaryResponses(result); + } catch (InvalidChannelException e1) { + result.hits().addError(ErrorMessage.createBackendCommunicationError("Invalid channel " + getName() + " (summary fetch)")); + return; + } catch (ChannelTimeoutException e1) { + result.hits().addError(ErrorMessage.createTimeout("timeout waiting for summaries from " + getName())); + return; + } + + if (receivedPackets.length == 0) { + result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " got no packets back (summary fetch)")); + return; } int skippedHits; @@ -183,8 +222,8 @@ public class FS4CloseableChannel extends CloseableChannel { "Error filling hits with summary fields, source: " + getName() + " Exception thrown: " + e.getMessage())); return; } - if (skippedHits == 0 && packetWrapper != null) { - searcher.getCacheControl().updateCacheEntry(cacheKey, channel.getQuery(), packetKeys, receivedPackets); + if (skippedHits == 0 && summaryCacheKey != null) { + searcher.getCacheControl().updateCacheEntry(summaryCacheKey, channel.getQuery(), summaryPacketKeys, receivedPackets); } if (skippedHits > 0) @@ -216,12 +255,12 @@ public class FS4CloseableChannel extends CloseableChannel { } } - private PacketWrapper cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) { + private boolean cacheLookupTwoPhase(CacheKey cacheKey, Result result, String summaryClass) { Query query = result.getQuery(); PacketWrapper packetWrapper = searcher.getCacheControl().lookup(cacheKey, query); if (packetWrapper == null) { - return null; + return false; } if (packetWrapper.getNumPackets() != 0) { for (Iterator<Hit> i = hitIterator(result); i.hasNext();) { @@ -241,7 +280,7 @@ public class FS4CloseableChannel extends CloseableChannel { result.analyzeHits(); } - return packetWrapper; + return true; } private CacheKey fetchCacheKeyFromHits(HitGroup hits, String summaryClass) { @@ -269,10 +308,8 @@ public class FS4CloseableChannel extends CloseableChannel { return count; } - private Packet[] fetchSummaries(Result result, String summaryClass) - throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException { + private int requestSummaries(Result result, String summaryClass) throws InvalidChannelException, ClassCastException, IOException { - BasicPacket[] receivedPackets; boolean summaryNeedsQuery = searcher.summaryNeedsQuery(result.getQuery()); if (result.getQuery().getTraceLevel() >= 3) result.getQuery().trace((summaryNeedsQuery ? "Resending " : "Not resending ") + "query during document summary fetching", 3); @@ -287,7 +324,15 @@ public class FS4CloseableChannel extends CloseableChannel { boolean couldSend = channel.sendPacket(docsumsPacket); if (!couldSend) throw new IOException("Could not successfully send GetDocSumsPacket."); - receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), docsumsPacket.getNumDocsums() + 1); + + return docsumsPacket.getNumDocsums() + 1; + } + + private Packet[] getSummaryResponses(Result result) throws InvalidChannelException, ChannelTimeoutException { + if(expectedFillResults == 0) { + return new Packet[0]; + } + BasicPacket[] receivedPackets = channel.receivePackets(result.getQuery().getTimeLeft(), expectedFillResults); return convertBasicPackets(receivedPackets); } @@ -295,11 +340,9 @@ public class FS4CloseableChannel extends CloseableChannel { /** * Returns an array of the hits contained in a result * - * @param filled - * true to return all hits, false to return only unfilled hits * @return array of docids, empty array if no hits */ - private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass, boolean filled) { + private DocsumPacketKey[] getPacketKeys(Result result, String summaryClass) { DocsumPacketKey[] packetKeys = new DocsumPacketKey[result.getHitCount()]; int x = 0; @@ -307,7 +350,7 @@ public class FS4CloseableChannel extends CloseableChannel { com.yahoo.search.result.Hit hit = i.next(); if (hit instanceof FastHit) { FastHit fastHit = (FastHit) hit; - if (filled || !fastHit.isFilled(summaryClass)) { + if (!fastHit.isFilled(summaryClass)) { packetKeys[x] = new DocsumPacketKey(fastHit.getGlobalId(), fastHit.getPartId(), summaryClass); x++; } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index d34d119c1fe..9acf48a7c67 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -28,6 +28,7 @@ import com.yahoo.search.searchchain.Execution; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; +import java.util.List; import java.util.Optional; import java.util.logging.Level; @@ -64,9 +65,9 @@ public class FastSearcher extends VespaBackEndSearcher { private final Dispatcher dispatcher; private final Backend dispatchBackend; - + private final FS4ResourcePool fs4ResourcePool; - + /** * Creates a Fastsearcher. * @@ -99,7 +100,7 @@ public class FastSearcher extends VespaBackEndSearcher { public Pong ping(Ping ping, Execution execution) { return ping(ping, dispatchBackend, getName()); } - + public static Pong ping(Ping ping, Backend backend, String name) { FS4Channel channel = backend.openPingChannel(); @@ -151,6 +152,7 @@ public class FastSearcher extends VespaBackEndSearcher { } } + @Override protected void transformQuery(Query query) { QueryRewrite.rewriteSddocname(query); } @@ -160,7 +162,8 @@ public class FastSearcher extends VespaBackEndSearcher { if (dispatcher.searchCluster().groupSize() == 1) forceSinglePassGrouping(query); try(CloseableChannel channel = getChannel(query)) { - Result result = channel.search(query, queryPacket, cacheKey); + List<Result> results = channel.search(query, queryPacket, cacheKey); + Result result = mergeResults(results, query, execution); if (query.properties().getBoolean(Ranking.RANKFEATURES, false)) { // There is currently no correct choice for which @@ -182,13 +185,13 @@ public class FastSearcher extends VespaBackEndSearcher { return result; } } - + /** When we only search a single node, doing all grouping in one pass is more efficient */ private void forceSinglePassGrouping(Query query) { for (GroupingRequest groupingRequest : query.getSelect().getGrouping()) forceSinglePassGrouping(groupingRequest.getRootOperation()); } - + private void forceSinglePassGrouping(GroupingOperation operation) { operation.setForceSinglePass(true); for (GroupingOperation childOperation : operation.getChildren()) @@ -231,6 +234,7 @@ public class FastSearcher extends VespaBackEndSearcher { * @param result result containing a partition of the unfilled hits * @param summaryClass the summary class we want to fill with **/ + @Override protected void doPartialFill(Result result, String summaryClass) { if (result.isFilled(summaryClass)) return; @@ -262,6 +266,34 @@ public class FastSearcher extends VespaBackEndSearcher { return false; } + private Result mergeResults(List<Result> results, Query query, Execution execution) { + if(results.size() == 1) { + return results.get(0); + } + + Result result = new Result(query); + + for (Result partialResult : results) { + result.mergeWith(partialResult); + result.hits().addAll(partialResult.hits().asUnorderedHits()); + } + + if (query.getOffset() != 0 || result.hits().size() > query.getHits()) { + // with multiple results, each partial result is expected to have + // offset = 0 to allow correct offset positioning after merge + + if (result.getHitOrderer() != null) { + // Make sure we have the necessary data for sorting + fill(result, Execution.ATTRIBUTEPREFETCH, execution); + } + result.hits().trim(query.getOffset(), query.getHits()); + } + + // TODO grouping + + return result; + } + private static @NonNull Optional<String> quotedSummaryClass(String summaryClass) { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index a6f98418a76..409d05e3aaf 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -213,11 +213,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { if (root == null || root instanceof NullItem) // root can become null after resolving and transformation? return new Result(query); - QueryPacket queryPacket = QueryPacket.create(query); - int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); - queryPacket.setCompressionLimit(compressionLimit); - if (compressionLimit != 0) - queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); + QueryPacket queryPacket = createQueryPacket(query); if (isLoggingFine()) getLogger().fine("made QueryPacket: " + queryPacket); @@ -241,6 +237,15 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { return result; } + protected QueryPacket createQueryPacket(Query query) { + QueryPacket queryPacket = QueryPacket.create(query); + int compressionLimit = query.properties().getInteger(PACKET_COMPRESSION_LIMIT, 0); + queryPacket.setCompressionLimit(compressionLimit); + if (compressionLimit != 0) + queryPacket.setCompressionType(query.properties().getString(PACKET_COMPRESSION_TYPE, "lz4")); + return queryPacket; + } + /** * Returns a cached result, or null if no result was cached for this key * @@ -355,7 +360,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { s.append(" location=") .append(query.getRanking().getLocation().toString()); } - + if (query.getGroupingSessionCache()) { s.append(" groupingSessionCache=true"); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java index 3f5ebe53d0d..fc337d589ec 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java @@ -8,19 +8,37 @@ import com.yahoo.search.Result; import java.io.Closeable; import java.io.IOException; +import java.util.List; /** - * CloseableChannel is an interface for running a search query and getting document summaries against some - * content node, node group or dispatcher while abstracting the specifics of the invocation target. + * CloseableChannel is an interface for running a search query and getting document summaries against some content node, node group or + * dispatcher while abstracting the specifics of the invocation target. ClosebleChannel objects are stateful and should not be reused. * * @author ollivir */ public abstract class CloseableChannel implements Closeable { - /** Retrieve the hits for the given {@link Query} */ - public abstract Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException; + /** Retrieve the hits for the given {@link Query}. The channel may return more than one result, in + * which case the caller is responsible for merging the results. If multiple results are returned + * and the search query had a hit offset other than zero, that offset will be set to zero and the + * number of requested hits will be adjusted accordingly. */ + public List<Result> search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { + sendSearchRequest(query, queryPacket); + return getSearchResults(cacheKey); + } + + protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException; + + protected abstract List<Result> getSearchResults(CacheKey cacheKey) throws IOException; /** Retrieve document summaries for the unfilled hits in the given {@link Result} */ - public abstract void partialFill(Result result, String summaryClass); + public void partialFill(Result result, String summaryClass) { + sendPartialFillRequest(result, summaryClass); + getPartialFillResults(result, summaryClass); + } + + protected abstract void getPartialFillResults(Result result, String summaryClass); + + protected abstract void sendPartialFillRequest(Result result, String summaryClass); protected abstract void closeChannel(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 0cf18852dd3..ce0d48f5638 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -27,6 +27,7 @@ import com.yahoo.slime.Cursor; import com.yahoo.slime.Slime; import com.yahoo.vespa.config.search.DispatchConfig; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -286,21 +287,31 @@ public class Dispatcher extends AbstractComponent { } public Optional<CloseableChannel> getDispatchedChannel(VespaBackEndSearcher searcher, Query query) { - Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); + if (!query.getSelect().getGrouping().isEmpty()) { + return Optional.empty(); + } - return groupInCluster.flatMap(group -> { - if(group.nodes().size() == 1) { - SearchCluster.Node node = group.nodes().iterator().next(); - query.trace(false, 2, "Dispatching internally to ", group, " (", node.toString(), ")"); - CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()); - channel.teardown(() -> { - loadBalancer.releaseGroup(group); - }); - return Optional.of(channel); - } else { - loadBalancer.releaseGroup(group); - return Optional.empty(); + Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); + if (!groupInCluster.isPresent()) { + return Optional.empty(); + } + SearchCluster.Group group = groupInCluster.get(); + query.trace(false, 2, "Dispatching internally to ", group); + + if (group.nodes().size() == 1) { + SearchCluster.Node node = group.nodes().iterator().next(); + CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), + node.key()); + return Optional.of(channel); + } else { + query.setNoCache(true); // Note - multi-node request disables packet based caching + + Map<Integer, CloseableChannel> subchannels = new HashMap<>(); + for (SearchCluster.Node node : group.nodes()) { + subchannels.put(node.key(), new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key())); } - }); + CloseableChannel multinode = new InterleavedCloseableChannel(subchannels); + return Optional.of(multinode); + } } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java new file mode 100644 index 00000000000..e461f6fc725 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java @@ -0,0 +1,98 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.fastsearch.CacheKey; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.result.Hit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * InterleavedCloseableChannel uses multiple {@link CloseableChannel} objects to interface with + * content nodes in parallel. Operationally it first sends requests to all channels and then + * collects the results. The invoker of this class is responsible for merging the results if + * needed. + * + * @author ollivir + */ +public class InterleavedCloseableChannel extends CloseableChannel { + private final Map<Integer, CloseableChannel> subchannels; + private Map<Integer, Result> expectedFillResults = null; + + public InterleavedCloseableChannel(Map<Integer, CloseableChannel> subchannels) { + this.subchannels = subchannels; + } + + /** Sends search queries to the contained {@link CloseableChannel} subchannels. If the + * search query has an offset other than zero, it will be reset to zero and the expected + * hit amount will be adjusted accordingly. */ + @Override + protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { + for (CloseableChannel subchannel : subchannels.values()) { + Query subquery = query.clone(); + + subquery.setHits(subquery.getHits() + subquery.getOffset()); + subquery.setOffset(0); + subchannel.sendSearchRequest(subquery, null); + } + } + + @Override + protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { + List<Result> results = new ArrayList<>(); + + for (CloseableChannel subchannel : subchannels.values()) { + results.addAll(subchannel.getSearchResults(cacheKey)); + } + return results; + } + + @Override + protected void sendPartialFillRequest(Result result, String summaryClass) { + expectedFillResults = new HashMap<>(); + + for (Iterator<Hit> it = result.hits().deepIterator(); it.hasNext();) { + Hit hit = it.next(); + if (hit instanceof FastHit) { + FastHit fhit = (FastHit) hit; + Result res = expectedFillResults.computeIfAbsent(fhit.getDistributionKey(), dk -> new Result(result.getQuery())); + res.hits().add(fhit); + } + } + expectedFillResults.forEach((distKey, partialResult) -> { + CloseableChannel channel = subchannels.get(distKey); + if (channel != null) { + channel.sendPartialFillRequest(partialResult, summaryClass); + } + }); + } + + @Override + protected void getPartialFillResults(Result result, String summaryClass) { + if (expectedFillResults == null) { + return; + } + expectedFillResults.forEach((distKey, partialResult) -> { + CloseableChannel channel = subchannels.get(distKey); + if (channel != null) { + channel.getPartialFillResults(partialResult, summaryClass); + } + }); + } + + @Override + protected void closeChannel() { + if (!subchannels.isEmpty()) { + subchannels.values().forEach(CloseableChannel::close); + subchannels.clear(); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 269d16fd24d..455696c16b1 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -26,17 +26,14 @@ public class LoadBalancer { private static final CompoundName QUERY_NODE_GROUP_AFFINITY = new CompoundName("dispatch.group.affinity"); - private final boolean isInternallyDispatchable; private final List<GroupSchedule> scoreboard; private int needle = 0; public LoadBalancer(SearchCluster searchCluster) { if (searchCluster == null) { - this.isInternallyDispatchable = false; this.scoreboard = null; return; } - this.isInternallyDispatchable = (searchCluster.groupSize() == 1); this.scoreboard = new ArrayList<>(searchCluster.groups().size()); for (Group group : searchCluster.groups().values()) { @@ -53,7 +50,7 @@ public class LoadBalancer { * @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used */ public Optional<Group> takeGroupForQuery(Query query) { - if (!isInternallyDispatchable) { + if (scoreboard == null) { return Optional.empty(); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java index e94c11e4473..5fa9dee8370 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -47,18 +47,7 @@ public class LoadBalancerTest { } @Test - public void requreThatLoadBalancerIgnoresClusteredSingleGroup() { - Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); - Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0); - SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 2, null); - LoadBalancer lb = new LoadBalancer(cluster); - - Optional<Group> grp = lb.takeGroupForQuery(new Query()); - assertThat(grp.isPresent(), is(false)); - } - - @Test - public void requreThatLoadBalancerIgnoresClusteredGroups() { + public void requreThatLoadBalancerServesClusteredGroups() { Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0); Node n3 = new SearchCluster.Node(0, "test-node3", 0, 1); @@ -67,7 +56,7 @@ public class LoadBalancerTest { LoadBalancer lb = new LoadBalancer(cluster); Optional<Group> grp = lb.takeGroupForQuery(new Query()); - assertThat(grp.isPresent(), is(false)); + assertThat(grp.isPresent(), is(true)); } @Test |