aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2022-11-25 13:02:58 +0100
committerGitHub <noreply@github.com>2022-11-25 13:02:58 +0100
commit841bd52428696d6f6617da90456605666ac9c159 (patch)
treea3a79d3dde959f4db63cb7b10170ca2d0b682a4d
parentcc1a7c6d4e8d16ae578a882cf791c9da95d90949 (diff)
parent20abea85ce33f8967eed91672cba452573e80a75 (diff)
Merge pull request #24990 from vespa-engine/balder/use-volatile
Using volatiles directly is fine for ensuring thread visibility.
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java45
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java37
2 files changed, 41 insertions, 41 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index fbea58054da..3d3efce2ab2 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -2,8 +2,6 @@
package com.yahoo.search.dispatch.searchcluster;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
/**
@@ -21,12 +19,15 @@ public class Group {
private final int id;
private final List<Node> nodes;
- private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true);
- private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true);
- private final AtomicLong activeDocuments = new AtomicLong(0);
- private final AtomicLong targetActiveDocuments = new AtomicLong(0);
- private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false);
- private final AtomicBoolean isBalanced = new AtomicBoolean(true);
+
+ // Using volatile to ensure visibility for reader.
+ // All udates are done in a single writer thread
+ private volatile boolean hasSufficientCoverage = true;
+ private volatile boolean hasFullCoverage = true;
+ private volatile long activeDocuments = 0;
+ private volatile long targetActiveDocuments = 0;
+ private volatile boolean isBlockingWrites = false;
+ private volatile boolean isBalanced = true;
public Group(int id, List<Node> nodes) {
this.id = id;
@@ -53,11 +54,11 @@ public class Group {
* (compared to other groups) that is should receive traffic
*/
public boolean hasSufficientCoverage() {
- return hasSufficientCoverage.get();
+ return hasSufficientCoverage;
}
void setHasSufficientCoverage(boolean sufficientCoverage) {
- hasSufficientCoverage.lazySet(sufficientCoverage);
+ hasSufficientCoverage = sufficientCoverage;
}
public int workingNodes() {
@@ -66,39 +67,38 @@ public class Group {
public void aggregateNodeValues() {
long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum();
- activeDocuments.set(activeDocs);
- long targetActiveDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getTargetActiveDocuments).sum();
- targetActiveDocuments.set(targetActiveDocs);
- isBlockingWrites.set(nodes.stream().anyMatch(Node::isBlockingWrites));
+ activeDocuments = activeDocs;
+ targetActiveDocuments = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getTargetActiveDocuments).sum();
+ isBlockingWrites = nodes.stream().anyMatch(Node::isBlockingWrites);
int numWorkingNodes = workingNodes();
if (numWorkingNodes > 0) {
long average = activeDocs / numWorkingNodes;
long skew = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
boolean balanced = skew <= activeDocs * maxContentSkew;
- if (balanced != isBalanced.get()) {
+ if (balanced != isBalanced) {
if (!isSparse())
log.info("Content in " + this + ", with " + numWorkingNodes + "/" + nodes.size() + " working nodes, is " +
(balanced ? "" : "not ") + "well balanced. Current deviation: " + skew * 100 / activeDocs +
"%. Active documents: " + activeDocs + ", skew: " + skew + ", average: " + average +
(balanced ? "" : ". Top-k summary fetch optimization is deactivated."));
- isBalanced.set(balanced);
+ isBalanced = balanced;
}
} else {
- isBalanced.set(true);
+ isBalanced = true;
}
}
/** Returns the active documents on this group. If unknown, 0 is returned. */
- long activeDocuments() { return activeDocuments.get(); }
+ long activeDocuments() { return activeDocuments; }
/** Returns the target active documents on this group. If unknown, 0 is returned. */
- long targetActiveDocuments() { return targetActiveDocuments.get(); }
+ long targetActiveDocuments() { return targetActiveDocuments; }
/** Returns whether any node in this group is currently blocking write operations */
- public boolean isBlockingWrites() { return isBlockingWrites.get(); }
+ public boolean isBlockingWrites() { return isBlockingWrites; }
/** Returns whether the nodes in the group have about the same number of documents */
- public boolean isBalanced() { return isBalanced.get(); }
+ public boolean isBalanced() { return isBalanced; }
/** Returns whether this group has too few documents per node to expect it to be balanced */
public boolean isSparse() {
@@ -107,7 +107,8 @@ public class Group {
}
public boolean fullCoverageStatusChanged(boolean hasFullCoverageNow) {
- boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow);
+ boolean previousState = hasFullCoverage;
+ hasFullCoverage = hasFullCoverageNow;
return previousState != hasFullCoverageNow;
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
index 73a3c3742cc..38d51585ae4 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
@@ -2,7 +2,6 @@
package com.yahoo.search.dispatch.searchcluster;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -14,17 +13,17 @@ import java.util.concurrent.atomic.AtomicLong;
public class Node {
private final int key;
- private int pathIndex;
private final String hostname;
private final int group;
+ private int pathIndex;
- private final AtomicBoolean statusIsKnown = new AtomicBoolean(false);
- private final AtomicBoolean working = new AtomicBoolean(true);
- private final AtomicLong activeDocuments = new AtomicLong(0);
- private final AtomicLong targetActiveDocuments = new AtomicLong(0);
private final AtomicLong pingSequence = new AtomicLong(0);
private final AtomicLong lastPong = new AtomicLong(0);
- private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false);
+ private volatile long activeDocuments = 0;
+ private volatile long targetActiveDocuments = 0;
+ private volatile boolean statusIsKnown = false;
+ private volatile boolean working = true;
+ private volatile boolean isBlockingWrites = false;
public Node(int key, String hostname, int group) {
this.key = key;
@@ -63,30 +62,30 @@ public class Node {
public int group() { return group; }
public void setWorking(boolean working) {
- this.statusIsKnown.lazySet(true);
- this.working.lazySet(working);
+ this.statusIsKnown = true;
+ this.working = working;
if ( ! working ) {
- activeDocuments.set(0);
- targetActiveDocuments.set(0);
+ activeDocuments = 0;
+ targetActiveDocuments = 0;
}
}
/** Returns whether this node is currently responding to requests, or null if status is not known */
public Boolean isWorking() {
- return statusIsKnown.get() ? working.get() : null;
+ return statusIsKnown ? working : null;
}
/** Updates the active documents on this node */
- public void setActiveDocuments(long documents) { this.activeDocuments.set(documents); }
- public void setTargetActiveDocuments(long documents) { this.targetActiveDocuments.set(documents); }
+ public void setActiveDocuments(long documents) { this.activeDocuments = documents; }
+ public void setTargetActiveDocuments(long documents) { this.targetActiveDocuments = documents; }
/** Returns the active documents on this node. If unknown, 0 is returned. */
- long getActiveDocuments() { return activeDocuments.get(); }
- long getTargetActiveDocuments() { return targetActiveDocuments.get(); }
+ long getActiveDocuments() { return activeDocuments; }
+ long getTargetActiveDocuments() { return targetActiveDocuments; }
- public void setBlockingWrites(boolean isBlockingWrites) { this.isBlockingWrites.set(isBlockingWrites); }
+ public void setBlockingWrites(boolean isBlockingWrites) { this.isBlockingWrites = isBlockingWrites; }
- boolean isBlockingWrites() { return isBlockingWrites.get(); }
+ boolean isBlockingWrites() { return isBlockingWrites; }
@Override
public int hashCode() { return Objects.hash(hostname, key, pathIndex, group); }
@@ -106,7 +105,7 @@ public class Node {
@Override
public String toString() {
return "search node key = " + key + " hostname = "+ hostname + " path = " + pathIndex + " in group " + group +
- " statusIsKnown = " + statusIsKnown.get() + " working = " + working.get() +
+ " statusIsKnown = " + statusIsKnown + " working = " + working +
" activeDocs = " + getActiveDocuments() + " targetActiveDocs = " + getTargetActiveDocuments();
}