diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 54 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java | 21 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java | 16 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java | 75 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java | 73 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java | 42 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java (renamed from container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java) | 221 |
7 files changed, 281 insertions, 221 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 286eee004c5..0dd682dee0e 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -11,6 +11,9 @@ import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Arrays; @@ -33,6 +36,8 @@ import java.util.Set; * @author ollvir */ public class Dispatcher extends AbstractComponent { + private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3; + /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); @@ -66,11 +71,6 @@ public class Dispatcher extends AbstractComponent { rpcResourcePool.release(); } - @FunctionalInterface - private interface SearchInvokerSupplier { - Optional<SearchInvoker> supply(Query query, List<SearchCluster.Node> nodes); - } - public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb, FS4InvokerFactory fs4InvokerFactory) { Optional<FillInvoker> rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb); @@ -102,6 +102,11 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } + @FunctionalInterface + private interface SearchInvokerSupplier { + Optional<SearchInvoker> supply(Query query, int groupId, List<Node> nodes, boolean acceptIncompleteCoverage); + } + // build invoker based on searchpath private Optional<SearchInvoker> getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) { String searchPath = query.getModel().getSearchPath(); @@ -109,11 +114,11 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } try { - List<SearchCluster.Node> nodes = SearchPath.selectNodes(searchPath, searchCluster); + List<Node> nodes = SearchPath.selectNodes(searchPath, searchCluster); if (nodes.isEmpty()) { return Optional.empty(); } else { - return invokerFactory.supply(query, nodes); + return invokerFactory.supply(query, -1, nodes, true); } } catch (InvalidSearchPathException e) { return Optional.of(new SearchErrorInvoker(e.getMessage())); @@ -121,41 +126,34 @@ public class Dispatcher extends AbstractComponent { } private Optional<SearchInvoker> getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) { - Optional<SearchCluster.Node> directNode = searchCluster.directDispatchTarget(); + Optional<Node> directNode = searchCluster.directDispatchTarget(); if (directNode.isPresent()) { - SearchCluster.Node node = directNode.get(); + Node node = directNode.get(); query.trace(false, 2, "Dispatching directly to ", node); - return invokerFactory.supply(query, Arrays.asList(node)); + return invokerFactory.supply(query, -1, Arrays.asList(node), true); } - Set<Integer> tried = null; - int max = searchCluster.groups().size(); - for (int attempt = 0; attempt < max; attempt++) { - Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); - if (! groupInCluster.isPresent()) { + int max = Integer.min(searchCluster.orderedGroups().size(), MAX_GROUP_SELECTION_ATTEMPTS); + Set<Integer> rejected = null; + for (int i = 0; i < max; i++) { + Optional<Group> groupInCluster = loadBalancer.takeGroupForQuery(query, rejected); + if (!groupInCluster.isPresent()) { // No groups available break; } - SearchCluster.Group group = groupInCluster.get(); - if (tried != null && tried.contains(group.id())) { - // bail out: LB is offering a previously discarded group - loadBalancer.releaseGroup(group); - break; - } - - Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes()); + Group group = groupInCluster.get(); + boolean acceptIncompleteCoverage = (i == max - 1); + Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.id(), group.nodes(), acceptIncompleteCoverage); if (invoker.isPresent()) { query.trace(false, 2, "Dispatching internally to ", group); invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); return invoker; } else { - // invoker could not be produced (likely connectivity issue) - searchCluster.groupConnectionFailure(group); loadBalancer.releaseGroup(group); - if (tried == null) { - tried = new HashSet<>(); + if (rejected == null) { + rejected = new HashSet<>(); } - tried.add(group.id()); + rejected.add(group.id()); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 64e38a488ab..22573fac2d9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -2,12 +2,14 @@ package com.yahoo.search.dispatch; import com.yahoo.search.Query; -import com.yahoo.search.dispatch.SearchCluster.Group; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,18 +49,19 @@ public class LoadBalancer { * {@link #releaseGroup} symmetrically for each taken allocation. * * @param query the query for which this allocation is made + * @param rejectedGroups if not null, the load balancer will only return groups with IDs not in the set * @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used */ - public Optional<Group> takeGroupForQuery(Query query) { + public Optional<Group> takeGroupForQuery(Query query, Set<Integer> rejectedGroups) { if (scoreboard == null) { return Optional.empty(); } - return allocateNextGroup(); + return allocateNextGroup(rejectedGroups); } /** - * Release an allocation given by {@link #takeGroupForQuery(Query)}. The release must be done exactly once for each allocation. + * Release an allocation given by {@link #takeGroupForQuery}. The release must be done exactly once for each allocation. * * @param group * previously allocated group @@ -74,7 +77,7 @@ public class LoadBalancer { } } - private Optional<Group> allocateNextGroup() { + private Optional<Group> allocateNextGroup(Set<Integer> rejectedGroups) { synchronized (this) { GroupSchedule bestSchedule = null; int bestIndex = needle; @@ -82,9 +85,11 @@ public class LoadBalancer { int index = needle; for (int i = 0; i < scoreboard.size(); i++) { GroupSchedule sched = scoreboard.get(index); - if (sched.isPreferredOver(bestSchedule)) { - bestSchedule = sched; - bestIndex = index; + if (rejectedGroups == null || !rejectedGroups.contains(sched.group.id())) { + if (sched.isPreferredOver(bestSchedule)) { + bestSchedule = sched; + bestIndex = index; + } } index = nextScoreboardIndex(index); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java index 57f06225d27..6800a80b78f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java @@ -3,7 +3,9 @@ package com.yahoo.search.dispatch; import com.google.common.collect.ImmutableCollection; import com.yahoo.collections.Pair; -import com.yahoo.search.dispatch.SearchCluster.Group; +import com.yahoo.search.dispatch.searchcluster.Group; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.ArrayList; import java.util.Collection; @@ -37,7 +39,7 @@ public class SearchPath { * @return list of nodes chosen with the search path, or an empty list in which * case some other node selection logic should be used */ - public static List<SearchCluster.Node> selectNodes(String searchPath, SearchCluster cluster) { + public static List<Node> selectNodes(String searchPath, SearchCluster cluster) { Optional<SearchPath> sp = SearchPath.fromString(searchPath); if (sp.isPresent()) { return sp.get().mapToNodes(cluster); @@ -46,7 +48,7 @@ public class SearchPath { } } - public static Optional<SearchPath> fromString(String path) { + static Optional<SearchPath> fromString(String path) { if (path == null || path.isEmpty()) { return Optional.empty(); } @@ -73,23 +75,23 @@ public class SearchPath { this.group = group; } - private List<SearchCluster.Node> mapToNodes(SearchCluster cluster) { + private List<Node> mapToNodes(SearchCluster cluster) { if (cluster.groups().isEmpty()) { return Collections.emptyList(); } - SearchCluster.Group selectedGroup = selectGroup(cluster); + Group selectedGroup = selectGroup(cluster); if (nodes.isEmpty()) { return selectedGroup.nodes(); } - List<SearchCluster.Node> groupNodes = selectedGroup.nodes(); + List<Node> groupNodes = selectedGroup.nodes(); Set<Integer> wanted = new HashSet<>(); int max = groupNodes.size(); for (NodeSelection node : nodes) { wanted.addAll(node.matches(max)); } - List<SearchCluster.Node> ret = new ArrayList<>(); + List<Node> ret = new ArrayList<>(); for (int idx : wanted) { ret.add(groupNodes.get(idx)); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java new file mode 100644 index 00000000000..01cbc5cd307 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -0,0 +1,75 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A group in a search cluster. This class is multithread safe. + * + * @author bratseth + * @author ollivir + */ +public class Group { + + private final int id; + private final ImmutableList<Node> nodes; + + private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); + private final AtomicLong activeDocuments = new AtomicLong(0); + + public Group(int id, List<Node> nodes) { + this.id = id; + this.nodes = ImmutableList.copyOf(nodes); + } + + /** Returns the unique identity of this group */ + public int id() { return id; } + + /** Returns the nodes in this group as an immutable list */ + public ImmutableList<Node> nodes() { return nodes; } + + /** + * Returns whether this group has sufficient active documents + * (compared to other groups) that is should receive traffic + */ + public boolean hasSufficientCoverage() { + return hasSufficientCoverage.get(); + } + + void setHasSufficientCoverage(boolean sufficientCoverage) { + hasSufficientCoverage.lazySet(sufficientCoverage); + } + + void aggregateActiveDocuments() { + long activeDocumentsInGroup = 0; + for (Node node : nodes) { + if (node.isWorking()) { + activeDocumentsInGroup += node.getActiveDocuments(); + } + } + activeDocuments.set(activeDocumentsInGroup); + + } + + /** Returns the active documents on this node. If unknown, 0 is returned. */ + long getActiveDocuments() { + return this.activeDocuments.get(); + } + + @Override + public String toString() { return "search group " + id; } + + @Override + public int hashCode() { return id; } + + @Override + public boolean equals(Object other) { + if (other == this) return true; + if (!(other instanceof Group)) return false; + return ((Group) other).id == this.id; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java new file mode 100644 index 00000000000..98deb9e3199 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -0,0 +1,73 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A node in a search cluster. This class is multithread safe. + * + * @author bratseth + * @author ollivir + */ +public class Node { + + private final int key; + private final String hostname; + private final int fs4port; + final int group; + + private final AtomicBoolean working = new AtomicBoolean(true); + private final AtomicLong activeDocuments = new AtomicLong(0); + + public Node(int key, String hostname, int fs4port, int group) { + this.key = key; + this.hostname = hostname; + this.fs4port = fs4port; + this.group = group; + } + + /** Returns the unique and stable distribution key of this node */ + public int key() { return key; } + + public String hostname() { return hostname; } + + public int fs4port() { return fs4port; } + + /** Returns the id of this group this node belongs to */ + public int group() { return group; } + + public void setWorking(boolean working) { + this.working.lazySet(working); + } + + /** Returns whether this node is currently responding to requests */ + public boolean isWorking() { return working.get(); } + + /** Updates the active documents on this node */ + void setActiveDocuments(long activeDocuments) { + this.activeDocuments.set(activeDocuments); + } + + /** Returns the active documents on this node. If unknown, 0 is returned. */ + public long getActiveDocuments() { + return this.activeDocuments.get(); + } + + @Override + public int hashCode() { return Objects.hash(hostname, fs4port); } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof Node)) return false; + Node other = (Node)o; + if ( ! Objects.equals(this.hostname, other.hostname)) return false; + if ( ! Objects.equals(this.fs4port, other.fs4port)) return false; + return true; + } + + @Override + public String toString() { return "search node " + hostname + ":" + fs4port + " in group " + group; } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java new file mode 100644 index 00000000000..7c7a9cb1d1c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java @@ -0,0 +1,42 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; +import com.yahoo.prelude.fastsearch.FastSearcher; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.yolean.Exceptions; + +import java.util.concurrent.Callable; + +/** + * @author bratseth + * @author ollivir + */ +class Pinger implements Callable<Pong> { + private final Node node; + private final ClusterMonitor<Node> clusterMonitor; + private final FS4ResourcePool fs4ResourcePool; + + public Pinger(Node node, ClusterMonitor<Node> clusterMonitor, FS4ResourcePool fs4ResourcePool) { + this.node = node; + this.clusterMonitor = clusterMonitor; + this.fs4ResourcePool = fs4ResourcePool; + } + + public Pong call() { + try { + Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), + fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString()); + if (pong.activeDocuments().isPresent()) + node.setActiveDocuments(pong.activeDocuments().get()); + return pong; + } catch (RuntimeException e) { + return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + + Exceptions.toMessageString(e))); + } + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index e26dd5648eb..b0e63d20931 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -1,5 +1,5 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch; +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; @@ -11,27 +11,18 @@ import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; - -// Only needed until query requests are moved to rpc -import com.yahoo.prelude.Ping; -import com.yahoo.prelude.fastsearch.FastSearcher; -import com.yahoo.yolean.Exceptions; import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -41,7 +32,7 @@ import java.util.stream.Collectors; * * @author bratseth */ -public class SearchCluster implements NodeManager<SearchCluster.Node> { +public class SearchCluster implements NodeManager<Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); @@ -128,8 +119,8 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { // Only use direct dispatch if we have exactly 1 search node on the same machine: if (localSearchNodes.size() != 1) return Optional.empty(); - SearchCluster.Node localSearchNode = localSearchNodes.iterator().next(); - SearchCluster.Group localSearchGroup = groups.get(localSearchNode.group()); + Node localSearchNode = localSearchNodes.iterator().next(); + Group localSearchGroup = groups.get(localSearchNode.group()); // Only use direct dispatch if the local search node has the entire corpus if (localSearchGroup.nodes().size() != 1) return Optional.empty(); @@ -188,7 +179,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { if ( ! directDispatchTarget.isPresent()) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - SearchCluster.Group localSearchGroup = groups.get(directDispatchTarget.get().group()); + Group localSearchGroup = groups.get(directDispatchTarget.get().group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is up @@ -216,10 +207,6 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { vipStatus.removeFromRotation(this); } - public void groupConnectionFailure(Group group) { - group.setHasSufficientCoverage(false); // will be reset after next ping iteration - } - private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { // update VIP status if we direct dispatch to this group and coverage status changed if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { @@ -245,7 +232,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { - Pinger pinger = new Pinger(node); + Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool); FutureTask<Pong> futurePong = new FutureTask<>(pinger); executor.execute(futurePong); Pong pong = getPong(futurePong, node); @@ -287,29 +274,34 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { Group group = orderedGroups.get(i); long activeDocuments = activeDocumentsInGroup[i]; long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); - boolean sufficientCoverage = true; - - if (averageDocumentsInOtherGroups > 0) { - double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; - sufficientCoverage = coverage >= minActivedocsCoveragePercentage; - } - if (sufficientCoverage) { - sufficientCoverage = isNodeCoverageSufficient(group); - } + boolean sufficientCoverage = isGroupCoverageSufficient(group.nodes(), activeDocuments, averageDocumentsInOtherGroups); updateSufficientCoverage(group, sufficientCoverage); } } - private boolean isNodeCoverageSufficient(Group group) { + private boolean isGroupCoverageSufficient(List<Node> nodes, long activeDocuments, long averageDocumentsInOtherGroups) { + boolean sufficientCoverage = true; + + if (averageDocumentsInOtherGroups > 0) { + double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; + sufficientCoverage = coverage >= minActivedocsCoveragePercentage; + } + if (sufficientCoverage) { + sufficientCoverage = isGroupNodeCoverageSufficient(nodes); + } + return sufficientCoverage; + } + + private boolean isGroupNodeCoverageSufficient(List<Node> nodes) { int nodesUp = 0; - for (Node node : group.nodes()) { + for (Node node : nodes) { if (node.isWorking()) { nodesUp++; } } - int nodes = group.nodes().size(); - int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) nodes * (100.0 - minGroupCoverage)) / 100.0); - return nodesUp + nodesAllowedDown >= nodes; + int numNodes = nodes.size(); + int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) numNodes * (100.0 - minGroupCoverage)) / 100.0); + return nodesUp + nodesAllowedDown >= numNodes; } private Pong getPong(FutureTask<Pong> futurePong, Node node) { @@ -326,153 +318,26 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { } } - private class Pinger implements Callable<Pong> { - - private final Node node; - - public Pinger(Node node) { - this.node = node; - } - - public Pong call() { - try { - Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), - fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString()); - if (pong.activeDocuments().isPresent()) - node.setActiveDocuments(pong.activeDocuments().get()); - return pong; - } catch (RuntimeException e) { - return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " - + Exceptions.toMessageString(e))); - } - } - - } - - /** A group in a search cluster. This class is multithread safe. */ - public static class Group { - - private final int id; - private final ImmutableList<Node> nodes; - - private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); - private final AtomicLong activeDocuments = new AtomicLong(0); - - public Group(int id, List<Node> nodes) { - this.id = id; - this.nodes = ImmutableList.copyOf(nodes); - } - - /** Returns the unique identity of this group */ - public int id() { return id; } - - /** Returns the nodes in this group as an immutable list */ - public ImmutableList<Node> nodes() { return nodes; } - - /** - * Returns whether this group has sufficient active documents - * (compared to other groups) that is should receive traffic - */ - public boolean hasSufficientCoverage() { - return hasSufficientCoverage.get(); - } - - void setHasSufficientCoverage(boolean sufficientCoverage) { - hasSufficientCoverage.lazySet(sufficientCoverage); + /** + * Calculate whether a subset of nodes in a group has enough coverage + */ + public boolean isPartialGroupCoverageSufficient(int groupId, List<Node> nodes) { + if (orderedGroups.size() == 1) { + return true; } - - void aggregateActiveDocuments() { - long activeDocumentsInGroup = 0; - for (Node node : nodes) { - if (node.isWorking()) { - activeDocumentsInGroup += node.getActiveDocuments(); - } + long sumOfActiveDocuments = 0; + int otherGroups = 0; + for (Group g : orderedGroups) { + if (g.id() != groupId) { + sumOfActiveDocuments += g.getActiveDocuments(); + otherGroups++; } - activeDocuments.set(activeDocumentsInGroup); - } - - /** Returns the active documents on this node. If unknown, 0 is returned. */ - long getActiveDocuments() { - return this.activeDocuments.get(); + long activeDocuments = 0; + for (Node n : nodes) { + activeDocuments += n.getActiveDocuments(); } - - @Override - public String toString() { return "search group " + id; } - - @Override - public int hashCode() { return id; } - - @Override - public boolean equals(Object other) { - if (other == this) return true; - if (!(other instanceof Group)) return false; - return ((Group) other).id == this.id; - } - + long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups; + return isGroupCoverageSufficient(nodes, activeDocuments, averageDocumentsInOtherGroups); } - - /** A node in a search cluster. This class is multithread safe. */ - public static class Node { - - private final int key; - private final String hostname; - private final int fs4port; - private final int group; - - private final AtomicBoolean working = new AtomicBoolean(true); - private final AtomicLong activeDocuments = new AtomicLong(0); - - public Node(int key, String hostname, int fs4port, int group) { - this.key = key; - this.hostname = hostname; - this.fs4port = fs4port; - this.group = group; - } - - /** Returns the unique and stable distribution key of this node */ - public int key() { return key; } - - public String hostname() { return hostname; } - - public int fs4port() { return fs4port; } - - /** Returns the id of this group this node belongs to */ - public int group() { return group; } - - void setWorking(boolean working) { - this.working.lazySet(working); - } - - /** Returns whether this node is currently responding to requests */ - public boolean isWorking() { return working.get(); } - - /** Updates the active documents on this node */ - void setActiveDocuments(long activeDocuments) { - this.activeDocuments.set(activeDocuments); - } - - /** Returns the active documents on this node. If unknown, 0 is returned. */ - public long getActiveDocuments() { - return this.activeDocuments.get(); - } - - @Override - public int hashCode() { return Objects.hash(hostname, fs4port); } - - @Override - public boolean equals(Object o) { - if (o == this) return true; - if ( ! (o instanceof Node)) return false; - Node other = (Node)o; - if ( ! Objects.equals(this.hostname, other.hostname)) return false; - if ( ! Objects.equals(this.fs4port, other.fs4port)) return false; - return true; - } - - @Override - public String toString() { return "search node " + hostname + ":" + fs4port + " in group " + group; } - - } - } |