diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-08 09:41:10 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-08 14:43:42 +0000 |
commit | 3f3021757fbfa2c1002c0bf3055a734a374fc8bd (patch) | |
tree | 56680ae3fdd437b141ece35a3be89f6b91699da9 /container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java | |
parent | 6d60d82bebd617880b2737d9f549b8402a1e1f24 (diff) |
Revert "Revert "Disable topk optimisation on dispatch when content distribution is se…""
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index ec616a18e09..23255fff330 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -22,6 +22,8 @@ public class Group { private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false); + private final AtomicBoolean isContentWellBalanced = new AtomicBoolean(true); + private final static double MAX_UNBALANCE = 0.10; // If documents on a node is more than 10% off from the average the group is unbalanced public Group(int id, List<Node> nodes) { this.id = id; @@ -53,25 +55,30 @@ public class Group { } public int workingNodes() { - int nodesUp = 0; - for (Node node : nodes) { - if (node.isWorking() == Boolean.TRUE) { - nodesUp++; - } - } - return nodesUp; + return (int) nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).count(); } void aggregateNodeValues() { - activeDocuments.set(nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum()); - isBlockingWrites.set(nodes.stream().anyMatch(node -> node.isBlockingWrites())); + long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum(); + activeDocuments.set(activeDocs); + isBlockingWrites.set(nodes.stream().anyMatch(Node::isBlockingWrites)); + int numWorkingNodes = workingNodes(); + if (numWorkingNodes > 0) { + long average = activeDocs / numWorkingNodes; + long deviation = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); + isContentWellBalanced.set(deviation <= (activeDocs * MAX_UNBALANCE)); + } else { + isContentWellBalanced.set(true); + } + } - /** Returns the active documents on this node. If unknown, 0 is returned. */ + /** Returns the active documents on this group. If unknown, 0 is returned. */ long getActiveDocuments() { return activeDocuments.get(); } /** Returns whether any node in this group is currently blocking write operations */ public boolean isBlockingWrites() { return isBlockingWrites.get(); } + public boolean isContentWellBalanced() { return isContentWellBalanced.get(); } public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) { boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow); |