aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-08 09:41:10 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2021-01-08 14:43:42 +0000
commit3f3021757fbfa2c1002c0bf3055a734a374fc8bd (patch)
tree56680ae3fdd437b141ece35a3be89f6b91699da9 /container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
parent6d60d82bebd617880b2737d9f549b8402a1e1f24 (diff)
Revert "Revert "Disable topk optimisation on dispatch when content distribution is se…""
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java27
1 files changed, 17 insertions, 10 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index ec616a18e09..23255fff330 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -22,6 +22,8 @@ public class Group {
private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true);
private final AtomicLong activeDocuments = new AtomicLong(0);
private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false);
+ private final AtomicBoolean isContentWellBalanced = new AtomicBoolean(true);
+ private final static double MAX_UNBALANCE = 0.10; // If documents on a node is more than 10% off from the average the group is unbalanced
public Group(int id, List<Node> nodes) {
this.id = id;
@@ -53,25 +55,30 @@ public class Group {
}
public int workingNodes() {
- int nodesUp = 0;
- for (Node node : nodes) {
- if (node.isWorking() == Boolean.TRUE) {
- nodesUp++;
- }
- }
- return nodesUp;
+ return (int) nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).count();
}
void aggregateNodeValues() {
- activeDocuments.set(nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum());
- isBlockingWrites.set(nodes.stream().anyMatch(node -> node.isBlockingWrites()));
+ long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum();
+ activeDocuments.set(activeDocs);
+ isBlockingWrites.set(nodes.stream().anyMatch(Node::isBlockingWrites));
+ int numWorkingNodes = workingNodes();
+ if (numWorkingNodes > 0) {
+ long average = activeDocs / numWorkingNodes;
+ long deviation = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
+ isContentWellBalanced.set(deviation <= (activeDocs * MAX_UNBALANCE));
+ } else {
+ isContentWellBalanced.set(true);
+ }
+
}
- /** Returns the active documents on this node. If unknown, 0 is returned. */
+ /** Returns the active documents on this group. If unknown, 0 is returned. */
long getActiveDocuments() { return activeDocuments.get(); }
/** Returns whether any node in this group is currently blocking write operations */
public boolean isBlockingWrites() { return isBlockingWrites.get(); }
+ public boolean isContentWellBalanced() { return isContentWellBalanced.get(); }
public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) {
boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow);