diff options
author | Olli Virtanen <ovirtanen@gmail.com> | 2018-10-25 14:45:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-25 14:45:14 +0200 |
commit | 5ad84ff5345f06adb7995c42c8e6c56babb7eeea (patch) | |
tree | b1b0da6f157cd4c75920cfdd9b513fc2865c388c | |
parent | 1cf9d739771c850fe5ed3366612e39372efdc0df (diff) | |
parent | 418bab4e2cb402d1db6cfb3f004bfc12c790f719 (diff) |
Merge pull request #7433 from vespa-engine/ollivir/dispatch-distribution-fixes
Fixes to java dispatcher load distribution
10 files changed, 235 insertions, 114 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java index 985b6e1e4b0..623a963f77a 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java @@ -3,16 +3,17 @@ package com.yahoo.vespa.model.search; import com.yahoo.config.application.api.DeployLogger; import com.yahoo.config.model.deploy.DeployState; -import com.yahoo.log.LogLevel; -import com.yahoo.vespa.config.search.AttributesConfig; -import com.yahoo.vespa.config.search.DispatchConfig; -import com.yahoo.vespa.config.search.core.ProtonConfig; -import com.yahoo.vespa.config.search.RankProfilesConfig; import com.yahoo.config.model.producer.AbstractConfigProducer; +import com.yahoo.log.LogLevel; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; import com.yahoo.search.config.IndexInfoConfig; import com.yahoo.searchdefinition.DocumentOnlySearch; import com.yahoo.searchdefinition.derived.DerivedConfiguration; +import com.yahoo.vespa.config.search.AttributesConfig; +import com.yahoo.vespa.config.search.DispatchConfig; +import com.yahoo.vespa.config.search.DispatchConfig.DistributionPolicy; +import com.yahoo.vespa.config.search.RankProfilesConfig; +import com.yahoo.vespa.config.search.core.ProtonConfig; import com.yahoo.vespa.configdefinition.IlscriptsConfig; import com.yahoo.vespa.model.HostResource; import com.yahoo.vespa.model.SimpleConfigProducer; @@ -23,8 +24,11 @@ import com.yahoo.vespa.model.content.DispatchSpec; import com.yahoo.vespa.model.content.SearchCoverage; import java.io.File; -import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.logging.Logger; /** @@ -316,17 +320,17 @@ public class IndexedSearchCluster extends SearchCluster @Override public DerivedConfiguration getSdConfig() { return null; } - + @Override public void getConfig(IndexInfoConfig.Builder builder) { unionCfg.getConfig(builder); } - + @Override public void getConfig(IlscriptsConfig.Builder builder) { unionCfg.getConfig(builder); } - + @Override public void getConfig(AttributesConfig.Builder builder) { unionCfg.getConfig(builder); @@ -402,6 +406,19 @@ public class IndexedSearchCluster extends SearchCluster nodeBuilder.fs4port(node.getDispatchPort()); if (tuning.dispatch.minActiveDocsCoverage != null) builder.minActivedocsPercentage(tuning.dispatch.minActiveDocsCoverage); + if (tuning.dispatch.minGroupCoverage != null) + builder.minGroupCoverage(tuning.dispatch.minGroupCoverage); + if (tuning.dispatch.policy != null) { + switch (tuning.dispatch.policy) { + case RANDOM: + builder.distributionPolicy(DistributionPolicy.RANDOM); + break; + case ROUNDROBIN: + builder.distributionPolicy(DistributionPolicy.ROUNDROBIN); + break; + } + } + builder.maxNodesDownPerGroup(rootDispatch.getMaxNodesDownPerFixedRow()); builder.node(nodeBuilder); } } diff --git a/configdefinitions/src/vespa/dispatch.def b/configdefinitions/src/vespa/dispatch.def index d8ef600a33f..602d3b17a8e 100644 --- a/configdefinitions/src/vespa/dispatch.def +++ b/configdefinitions/src/vespa/dispatch.def @@ -7,6 +7,15 @@ namespace=vespa.config.search # for that group to be included in queries minActivedocsPercentage double default=97.0 +# Minimum coverage for allowing a group to be considered for serving +minGroupCoverage double default=100 + +# Maximum number of nodes allowed to be down for group to be considered for serving +maxNodesDownPerGroup int default=0 + +# Distribution policy for group selection +distributionPolicy enum { ROUNDROBIN, RANDOM } default=ROUNDROBIN + # The unique key of a search node node[].key int diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java index 2a90e746378..202ee94383f 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/Backend.java @@ -354,6 +354,31 @@ public class Backend implements ConnectionFactory { } /** + * Attempt to establish a connection without sending messages and then + * return it to the pool. The assumption is that if the probing is + * successful, the connection will be used soon after. There should be + * minimal overhead since the connection is cached. + */ + public boolean probeConnection() { + if (shutdownInitiated) { + return false; + } + + FS4Connection connection = null; + try { + connection = getConnection(); + } catch (IOException ignored) { + // connection is null + } finally { + if (connection != null) { + returnConnection(connection); + } + } + + return connection != null; + } + + /** * This method should be used to ensure graceful shutdown of the backend. */ public void shutdown() { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index 08dcbe17db2..f68bb718c8d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -2,9 +2,9 @@ package com.yahoo.prelude.fastsearch; import com.google.common.collect.ImmutableMap; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.CloseableInvoker; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InterleavedFillInvoker; import com.yahoo.search.dispatch.InterleavedSearchInvoker; @@ -12,7 +12,6 @@ import com.yahoo.search.dispatch.SearchCluster; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.result.Hit; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -43,31 +42,64 @@ public class FS4InvokerFactory { } public SearchInvoker getSearchInvoker(Query query, SearchCluster.Node node) { - return new FS4SearchInvoker(searcher, query, fs4ResourcePool, node); + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + return new FS4SearchInvoker(searcher, query, backend.openChannel(), node); } + /** + * Create a {@link SearchInvoker} for a list of content nodes. + * + * @param query the search query being processed + * @param nodes pre-selected list of content nodes + * @return Optional containing the SearchInvoker or <i>empty</i> if some node in the list is invalid + */ public Optional<SearchInvoker> getSearchInvoker(Query query, List<SearchCluster.Node> nodes) { - return getInvoker(nodes, node -> getSearchInvoker(query, node), InterleavedSearchInvoker::new); + Map<Integer, SearchInvoker> invokers = new HashMap<>(); + for (SearchCluster.Node node : nodes) { + if (node.isWorking()) { + Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); + if (backend.probeConnection()) { + invokers.put(node.key(), new FS4SearchInvoker(searcher, query, backend.openChannel(), node)); + } else { + return Optional.empty(); + } + } + } + if (invokers.size() == 1) { + return Optional.of(invokers.values().iterator().next()); + } else { + return Optional.of(new InterleavedSearchInvoker(invokers)); + } } public FillInvoker getFillInvoker(Query query, SearchCluster.Node node) { return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()); } + /** + * Create a {@link FillInvoker} for a the hits in a {@link Result}. + * + * @param result the Result containing hits that need to be filled + * @return Optional containing the FillInvoker or <i>empty</i> if some hit is from an unknown content node + */ public Optional<FillInvoker> getFillInvoker(Result result) { Collection<Integer> requiredNodes = requiredFillNodes(result); - List<SearchCluster.Node> nodes = new ArrayList<>(requiredNodes.size()); + Query query = result.getQuery(); + Map<Integer, FillInvoker> invokers = new HashMap<>(); for (Integer distKey : requiredNodes) { SearchCluster.Node node = nodesByKey.get(distKey); if (node == null) { return Optional.empty(); } - nodes.add(node); + invokers.put(distKey, getFillInvoker(query, node)); } - Query query = result.getQuery(); - return getInvoker(nodes, node -> getFillInvoker(query, node), InterleavedFillInvoker::new); + if (invokers.size() == 1) { + return Optional.of(invokers.values().iterator().next()); + } else { + return Optional.of(new InterleavedFillInvoker(invokers)); + } } private static Collection<Integer> requiredFillNodes(Result result) { @@ -81,40 +113,4 @@ public class FS4InvokerFactory { } return requiredNodes; } - - @FunctionalInterface - private interface InvokerConstructor<INVOKER> { - INVOKER construct(SearchCluster.Node node); - } - - @FunctionalInterface - private interface ClusterInvokerConstructor<CLUSTERINVOKER extends INVOKER, INVOKER> { - CLUSTERINVOKER construct(Map<Integer, INVOKER> subinvokers); - } - - /* Get an invocation object for the provided collection of nodes. If only one - node is used, only the single-node invoker is used. For multiple nodes, each - gets a single-node invoker and they are all wrapped into a cluster invoker. - The functional interfaces are used to allow code reuse with SearchInvokers - and FillInvokers even though they don't share much class hierarchy. */ - private <INVOKER extends CloseableInvoker, CLUSTERINVOKER extends INVOKER> Optional<INVOKER> getInvoker( - Collection<SearchCluster.Node> nodes, InvokerConstructor<INVOKER> singleNodeCtor, - ClusterInvokerConstructor<CLUSTERINVOKER, INVOKER> clusterCtor) { - if (nodes.size() == 1) { - SearchCluster.Node node = nodes.iterator().next(); - return Optional.of(singleNodeCtor.construct(node)); - } else { - Map<Integer, INVOKER> nodeInvokers = new HashMap<>(); - for (SearchCluster.Node node : nodes) { - if (node.isWorking()) { - nodeInvokers.put(node.key(), singleNodeCtor.construct(node)); - } - } - if (nodeInvokers.size() == 1) { - return Optional.of(nodeInvokers.values().iterator().next()); - } else { - return Optional.of(clusterCtor.construct(nodeInvokers)); - } - } - } } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index ac48aef7063..dc8cd53e638 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -39,12 +39,11 @@ public class FS4SearchInvoker extends SearchInvoker { private Query query = null; private QueryPacket queryPacket = null; - public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, SearchCluster.Node node) { + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, SearchCluster.Node node) { this.searcher = searcher; this.node = Optional.of(node); + this.channel = channel; - Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); - this.channel = backend.openChannel(); channel.setQuery(query); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 2fdb10067ff..235e7af09d2 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -14,9 +14,11 @@ import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -42,14 +44,15 @@ public class Dispatcher extends AbstractComponent { public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus); - this.loadBalancer = new LoadBalancer(searchCluster); + this.loadBalancer = new LoadBalancer(searchCluster, + dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); this.rpcResourcePool = new RpcResourcePool(dispatchConfig); } /** For testing */ public Dispatcher(Map<Integer, Client.NodeConnection> nodeConnections, Client client) { this.searchCluster = null; - this.loadBalancer = new LoadBalancer(searchCluster); + this.loadBalancer = new LoadBalancer(searchCluster, true); this.rpcResourcePool = new RpcResourcePool(client, nodeConnections); } @@ -120,19 +123,37 @@ public class Dispatcher extends AbstractComponent { return invokerFactory.supply(query, Arrays.asList(node)); } - Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); - if (!groupInCluster.isPresent()) { - return Optional.empty(); - } - SearchCluster.Group group = groupInCluster.get(); - query.trace(false, 2, "Dispatching internally to ", group); - - Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes()); - if (invoker.isPresent()) { - invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); - } else { - loadBalancer.releaseGroup(group); + Set<Integer> tried = null; + int max = searchCluster.groups().size(); + for (int attempt = 0; attempt < max; attempt++) { + Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); + if (! groupInCluster.isPresent()) { + // No groups available + break; + } + SearchCluster.Group group = groupInCluster.get(); + if (tried != null && tried.contains(group.id())) { + // bail out: LB is offering a previously discarded group + loadBalancer.releaseGroup(group); + break; + } + + Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes()); + if (invoker.isPresent()) { + query.trace(false, 2, "Dispatching internally to ", group); + invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); + return invoker; + } else { + // invoker could not be produced (likely connectivity issue) + searchCluster.groupConnectionFailure(group); + loadBalancer.releaseGroup(group); + if (tried == null) { + tried = new HashSet<>(); + } + tried.add(group.id()); + } } - return invoker; + + return Optional.empty(); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 9eac9b9b63d..64e38a488ab 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -19,24 +19,27 @@ import java.util.logging.Logger; */ public class LoadBalancer { // The implementation here is a simplistic least queries in flight + round-robin load balancer - // TODO: consider the options in com.yahoo.vespa.model.content.TuningDispatch private static final Logger log = Logger.getLogger(LoadBalancer.class.getName()); private final List<GroupSchedule> scoreboard; private int needle = 0; - public LoadBalancer(SearchCluster searchCluster) { + public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) { if (searchCluster == null) { this.scoreboard = null; return; } this.scoreboard = new ArrayList<>(searchCluster.groups().size()); - for (Group group : searchCluster.groups().values()) { + for (Group group : searchCluster.orderedGroups()) { scoreboard.add(new GroupSchedule(group)); } - Collections.shuffle(scoreboard); + + if(! roundRobin) { + // TODO - More randomness could be desirable + Collections.shuffle(scoreboard); + } } /** @@ -74,16 +77,18 @@ public class LoadBalancer { private Optional<Group> allocateNextGroup() { synchronized (this) { GroupSchedule bestSchedule = null; + int bestIndex = needle; int index = needle; for (int i = 0; i < scoreboard.size(); i++) { GroupSchedule sched = scoreboard.get(index); if (sched.isPreferredOver(bestSchedule)) { bestSchedule = sched; + bestIndex = index; } index = nextScoreboardIndex(index); } - needle = nextScoreboardIndex(needle); + needle = nextScoreboardIndex(bestIndex); Group ret = null; if (bestSchedule != null) { @@ -118,9 +123,18 @@ public class LoadBalancer { if (other == null) { return true; } - if (! group.hasSufficientCoverage()) { - return false; + + // different coverage + if (this.group.hasSufficientCoverage() != other.group.hasSufficientCoverage()) { + if (! this.group.hasSufficientCoverage()) { + // this doesn't have coverage, other does + return false; + } else { + // other doesn't have coverage, this does + return true; + } } + return this.score < other.score; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index 0d50702acfd..e26dd5648eb 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -46,7 +46,9 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ - private double minActivedocsCoveragePercentage; + private final double minActivedocsCoveragePercentage; + private final double minGroupCoverage; + private final int maxNodesDownPerGroup; private final int size; private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; @@ -67,15 +69,16 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { // Only needed until query requests are moved to rpc private final FS4ResourcePool fs4ResourcePool; - public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, - int containerClusterSize, VipStatus vipStatus) { - this(dispatchConfig.minActivedocsPercentage(), toNodes(dispatchConfig), fs4ResourcePool, - containerClusterSize, vipStatus); + public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { + this(dispatchConfig.minActivedocsPercentage(), dispatchConfig.minGroupCoverage(), dispatchConfig.maxNodesDownPerGroup(), + toNodes(dispatchConfig), fs4ResourcePool, containerClusterSize, vipStatus); } - public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool, - int containerClusterSize, VipStatus vipStatus) { + public SearchCluster(double minActivedocsCoverage, double minGroupCoverage, int maxNodesDownPerGroup, List<Node> nodes, FS4ResourcePool fs4ResourcePool, + int containerClusterSize, VipStatus vipStatus) { this.minActivedocsCoveragePercentage = minActivedocsCoverage; + this.minGroupCoverage = minGroupCoverage; + this.maxNodesDownPerGroup = maxNodesDownPerGroup; this.size = nodes.size(); this.fs4ResourcePool = fs4ResourcePool; this.vipStatus = vipStatus; @@ -153,6 +156,9 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { /** Returns the groups of this cluster as an immutable map indexed by group id */ public ImmutableMap<Integer, Group> groups() { return groups; } + /** Returns the groups of this cluster as an immutable list in introduction order */ + public ImmutableList<Group> orderedGroups() { return orderedGroups; } + /** Returns the n'th (zero-indexed) group in the cluster if possible */ public Optional<Group> group(int n) { if (orderedGroups.size() > n) { @@ -210,6 +216,10 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { vipStatus.removeFromRotation(this); } + public void groupConnectionFailure(Group group) { + group.setHasSufficientCoverage(false); // will be reset after next ping iteration + } + private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { // update VIP status if we direct dispatch to this group and coverage status changed if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { @@ -254,27 +264,54 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { */ @Override public void pingIterationCompleted() { + int numGroups = orderedGroups.size(); + if (numGroups == 1) { + Group group = groups.values().iterator().next(); + group.aggregateActiveDocuments(); + updateSufficientCoverage(group, true); // by definition + return; + } + // Update active documents per group and use it to decide if the group should be active - for (Group group : groups.values()) + + long[] activeDocumentsInGroup = new long[numGroups]; + long sumOfActiveDocuments = 0; + for(int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); group.aggregateActiveDocuments(); - if (groups.size() == 1) { - updateSufficientCoverage(groups.values().iterator().next(), true); // by definition - } else { - for (Group currentGroup : groups.values()) { - long sumOfAactiveDocumentsInOtherGroups = 0; - for (Group otherGroup : groups.values()) - if (otherGroup != currentGroup) - sumOfAactiveDocumentsInOtherGroups += otherGroup.getActiveDocuments(); - long averageDocumentsInOtherGroups = sumOfAactiveDocumentsInOtherGroups / (groups.size() - 1); - if (averageDocumentsInOtherGroups == 0) - updateSufficientCoverage(currentGroup, true); // no information about any group; assume coverage - else - updateSufficientCoverage(currentGroup, - 100 * (double) currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage); + activeDocumentsInGroup[i] = group.getActiveDocuments(); + sumOfActiveDocuments += activeDocumentsInGroup[i]; + } + + for (int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); + long activeDocuments = activeDocumentsInGroup[i]; + long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); + boolean sufficientCoverage = true; + + if (averageDocumentsInOtherGroups > 0) { + double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; + sufficientCoverage = coverage >= minActivedocsCoveragePercentage; } + if (sufficientCoverage) { + sufficientCoverage = isNodeCoverageSufficient(group); + } + updateSufficientCoverage(group, sufficientCoverage); } } + private boolean isNodeCoverageSufficient(Group group) { + int nodesUp = 0; + for (Node node : group.nodes()) { + if (node.isWorking()) { + nodesUp++; + } + } + int nodes = group.nodes().size(); + int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) nodes * (100.0 - minGroupCoverage)) / 100.0); + return nodesUp + nodesAllowedDown >= nodes; + } + private Pong getPong(FutureTask<Pong> futurePong, Node node) { try { return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); @@ -346,8 +383,11 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { void aggregateActiveDocuments() { long activeDocumentsInGroup = 0; - for (Node node : nodes) - activeDocumentsInGroup += node.getActiveDocuments(); + for (Node node : nodes) { + if (node.isWorking()) { + activeDocumentsInGroup += node.getActiveDocuments(); + } + } activeDocuments.set(activeDocumentsInGroup); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java index b08a3a73a01..9311ddab3c6 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -22,8 +22,8 @@ public class LoadBalancerTest { @Test public void requreThatLoadBalancerServesSingleNodeSetups() { Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); - SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1), null, 1, null); - LoadBalancer lb = new LoadBalancer(cluster); + SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster, true); Optional<Group> grp = lb.takeGroupForQuery(new Query()); Group group = grp.orElseGet(() -> { @@ -36,8 +36,8 @@ public class LoadBalancerTest { public void requreThatLoadBalancerServesMultiGroupSetups() { Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); - SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null); - LoadBalancer lb = new LoadBalancer(cluster); + SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster, true); Optional<Group> grp = lb.takeGroupForQuery(new Query()); Group group = grp.orElseGet(() -> { @@ -52,8 +52,8 @@ public class LoadBalancerTest { Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0); Node n3 = new SearchCluster.Node(0, "test-node3", 0, 1); Node n4 = new SearchCluster.Node(1, "test-node4", 1, 1); - SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2, n3, n4), null, 2, null); - LoadBalancer lb = new LoadBalancer(cluster); + SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2, n3, n4), null, 2, null); + LoadBalancer lb = new LoadBalancer(cluster, true); Optional<Group> grp = lb.takeGroupForQuery(new Query()); assertThat(grp.isPresent(), is(true)); @@ -63,8 +63,8 @@ public class LoadBalancerTest { public void requreThatLoadBalancerReturnsDifferentGroups() { Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); - SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null); - LoadBalancer lb = new LoadBalancer(cluster); + SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster, true); // get first group Optional<Group> grp = lb.takeGroupForQuery(new Query()); @@ -83,8 +83,8 @@ public class LoadBalancerTest { public void requreThatLoadBalancerReturnsGroupWithShortestQueue() { Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); - SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null); - LoadBalancer lb = new LoadBalancer(cluster); + SearchCluster cluster = new SearchCluster(88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster, true); // get first group Optional<Group> grp = lb.takeGroupForQuery(new Query()); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index 89b416f3293..ee903fd3fa0 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -19,7 +19,7 @@ public class MockSearchCluster extends SearchCluster { private final ImmutableMultimap<String, Node> nodesByHost; public MockSearchCluster(int groups, int nodesPerGroup) { - super(100, Collections.emptyList(), null, 1, null); + super(100, 100, 0, Collections.emptyList(), null, 1, null); ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder(); ImmutableMultimap.Builder<String, Node> hostBuilder = ImmutableMultimap.builder(); |