aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-12 06:37:25 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-01-12 07:07:29 +0100
commit1a179e337b3080cca3713103d7c042d61a84ed0d (patch)
treeb18f758234e8996f7e75218251b3a4fa9a530504 /documentapi
parentebc5f58ceed40d4eb2328a4be425b88a25bb2e41 (diff)
- Synchronize when removing nodes not present anymore. This simplifies code at neglible cost that removes the race on size().
- Also use explicit AtomicReference to get an Immutable snapshot when doing a random sample. - Done after observing IllegalArgumentException due to negative argument to Random.nextInt(bound).
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java26
1 files changed, 13 insertions, 13 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
index 21e621883fe..3ce27ce6bdb 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
* Routing policy to determine which distributor in a content cluster to send data to.
@@ -74,27 +75,27 @@ public class ContentPolicy extends SlobrokPolicy {
public abstract static class HostFetcher {
private static class Targets {
- private final List<Integer> list;
- private final AtomicInteger size;
+ private final AtomicReference<List<Integer>> list = new AtomicReference<>();
final int total;
Targets() {
this(List.of(), 0);
}
Targets(List<Integer> list, int total) {
- this.list = new CopyOnWriteArrayList<>(list);
- this.size = new AtomicInteger(list.size());
+ this.list.set(List.copyOf(list));
this.total = Math.max(1, total);
}
- Integer get(int i) {
- return list.get(i);
+ Integer get(Random randomizer) {
+ List<Integer> snapshot = list.get();
+ return snapshot.get(randomizer.nextInt(snapshot.size()));
}
- void remove(Integer v) {
- size.decrementAndGet();
- list.add(null); // Avoid index out of bounds for racing getters.
- list.remove(v);
+ synchronized void remove(Integer v) {
+ List<Integer> snapshot = list.get();
+ if (snapshot.contains(v)) {
+ list.set(snapshot.stream().filter((item) -> !v.equals(item)).collect(Collectors.toList()));
+ }
}
int size() {
- return size.get();
+ return list.get().size();
}
}
@@ -119,8 +120,7 @@ public class ContentPolicy extends SlobrokPolicy {
// Try to use list of random targets, if at least X % of the nodes are up
while (100 * targets.size() >= requiredUpPercentageToSendToKnownGoodNodes * targets.total)
{
- Integer distributor = targets.get(randomizer.nextInt(targets.size()));
- if (distributor == null) continue;
+ Integer distributor = targets.get(randomizer);
String targetSpec = getTargetSpec(distributor, context);
if (targetSpec != null) {
context.trace(3, "Sending to random node seen up in cluster state");