diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2019-01-15 15:05:19 +0100 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2019-01-15 15:05:19 +0100 |
commit | 9c03d3d5620c1ce2da81ba45518381df423eb2d1 (patch) | |
tree | 5604289f8201e3f7b561f63f128c3f242de44023 /container-search/src/main/java/com/yahoo/search/dispatch/searchcluster | |
parent | bb9a8be9b26191ef6887b352ab11f64bd75cda6b (diff) |
uselocalnode support and fix to coverage estimation
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java | 10 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 64 |
2 files changed, 56 insertions, 18 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index 01cbc5cd307..146c62f0a16 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -44,6 +44,16 @@ public class Group { hasSufficientCoverage.lazySet(sufficientCoverage); } + public int workingNodes() { + int nodesUp = 0; + for (Node node : nodes) { + if (node.isWorking()) { + nodesUp++; + } + } + return nodesUp; + } + void aggregateActiveDocuments() { long activeDocumentsInGroup = 0; for (Node node : nodes) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 8e278f78d7a..e497ef6751b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -23,6 +23,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -102,7 +103,7 @@ public class SearchCluster implements NodeManager<Node> { private static Optional<Node> findDirectDispatchTarget(String selfHostname, int searchClusterSize, int containerClusterSize, - ImmutableMultimap<String, Node>nodesByHost, + ImmutableMultimap<String, Node> nodesByHost, ImmutableMap<Integer, Group> groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. @@ -129,8 +130,18 @@ public class SearchCluster implements NodeManager<Node> { private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) - nodesBuilder.add(new Node(node.key(), node.host(), node.fs4port(), node.group())); + Predicate<DispatchConfig.Node> filter; + if (dispatchConfig.useLocalNode()) { + final String hostName = HostName.getLocalhost(); + filter = node -> node.host().equals(hostName); + } else { + filter = node -> true; + } + for (DispatchConfig.Node node : dispatchConfig.node()) { + if (filter.test(node)) { + nodesBuilder.add(new Node(node.key(), node.host(), node.fs4port(), node.group())); + } + } return nodesBuilder.build(); } @@ -162,6 +173,16 @@ public class SearchCluster implements NodeManager<Node> { return size() / groups.size(); } + public int groupsWithSufficientCoverage() { + int covered = 0; + for (Group g : orderedGroups) { + if (g.hasSufficientCoverage()) { + covered++; + } + } + return covered; + } + /** * Returns the nodes of this cluster as an immutable map indexed by host. * One host may contain multiple nodes (on different ports), so this is a multi-map. @@ -271,12 +292,13 @@ public class SearchCluster implements NodeManager<Node> { Group group = orderedGroups.get(i); long activeDocuments = activeDocumentsInGroup[i]; long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); - boolean sufficientCoverage = isGroupCoverageSufficient(group.nodes(), activeDocuments, averageDocumentsInOtherGroups); + boolean sufficientCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), activeDocuments, + averageDocumentsInOtherGroups); updateSufficientCoverage(group, sufficientCoverage); } } - private boolean isGroupCoverageSufficient(List<Node> nodes, long activeDocuments, long averageDocumentsInOtherGroups) { + private boolean isGroupCoverageSufficient(int workingNodes, int nodesInGroup, long activeDocuments, long averageDocumentsInOtherGroups) { boolean sufficientCoverage = true; if (averageDocumentsInOtherGroups > 0) { @@ -284,22 +306,15 @@ public class SearchCluster implements NodeManager<Node> { sufficientCoverage = coverage >= dispatchConfig.minActivedocsPercentage(); } if (sufficientCoverage) { - sufficientCoverage = isGroupNodeCoverageSufficient(nodes); + sufficientCoverage = isGroupNodeCoverageSufficient(workingNodes, nodesInGroup); } return sufficientCoverage; } - private boolean isGroupNodeCoverageSufficient(List<Node> nodes) { - int nodesUp = 0; - for (Node node : nodes) { - if (node.isWorking()) { - nodesUp++; - } - } - int numNodes = nodes.size(); + private boolean isGroupNodeCoverageSufficient(int workingNodes, int nodesInGroup) { int nodesAllowedDown = dispatchConfig.maxNodesDownPerGroup() - + (int) (((double) numNodes * (100.0 - dispatchConfig.minGroupCoverage())) / 100.0); - return nodesUp + nodesAllowedDown >= numNodes; + + (int) (((double) nodesInGroup * (100.0 - dispatchConfig.minGroupCoverage())) / 100.0); + return workingNodes + nodesAllowedDown >= nodesInGroup; } private Pong getPong(FutureTask<Pong> futurePong, Node node) { @@ -316,13 +331,24 @@ public class SearchCluster implements NodeManager<Node> { } } + private void logIfInsufficientCoverage(boolean sufficient, int groupId, int nodes) { + if (!sufficient) { + log.warning(() -> String.format("Coverage of group %s is only %d/%d (requires %d)", groupId, nodes, groupSize(), + groupSize() - dispatchConfig.maxNodesDownPerGroup())); + } + } + /** * Calculate whether a subset of nodes in a group has enough coverage */ public boolean isPartialGroupCoverageSufficient(int groupId, List<Node> nodes) { if (orderedGroups.size() == 1) { - return nodes.size() >= groupSize() - dispatchConfig.maxNodesDownPerGroup(); + boolean sufficient = nodes.size() >= groupSize() - dispatchConfig.maxNodesDownPerGroup(); + logIfInsufficientCoverage(sufficient, groupId, nodes.size()); + return sufficient; } + + int nodesInGroup = groups.get(groupId).nodes().size(); long sumOfActiveDocuments = 0; int otherGroups = 0; for (Group g : orderedGroups) { @@ -336,6 +362,8 @@ public class SearchCluster implements NodeManager<Node> { activeDocuments += n.getActiveDocuments(); } long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups; - return isGroupCoverageSufficient(nodes, activeDocuments, averageDocumentsInOtherGroups); + boolean sufficient = isGroupCoverageSufficient(nodes.size(), nodesInGroup, activeDocuments, averageDocumentsInOtherGroups); + logIfInsufficientCoverage(sufficient, groupId, nodes.size()); + return sufficient; } } |