diff options
Diffstat (limited to 'documentapi')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java | 26 |
1 files changed, 13 insertions, 13 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java index 21e621883fe..3ce27ce6bdb 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * Routing policy to determine which distributor in a content cluster to send data to. @@ -74,27 +75,27 @@ public class ContentPolicy extends SlobrokPolicy { public abstract static class HostFetcher { private static class Targets { - private final List<Integer> list; - private final AtomicInteger size; + private final AtomicReference<List<Integer>> list = new AtomicReference<>(); final int total; Targets() { this(List.of(), 0); } Targets(List<Integer> list, int total) { - this.list = new CopyOnWriteArrayList<>(list); - this.size = new AtomicInteger(list.size()); + this.list.set(List.copyOf(list)); this.total = Math.max(1, total); } - Integer get(int i) { - return list.get(i); + Integer get(Random randomizer) { + List<Integer> snapshot = list.get(); + return snapshot.get(randomizer.nextInt(snapshot.size())); } - void remove(Integer v) { - size.decrementAndGet(); - list.add(null); // Avoid index out of bounds for racing getters. - list.remove(v); + synchronized void remove(Integer v) { + List<Integer> snapshot = list.get(); + if (snapshot.contains(v)) { + list.set(snapshot.stream().filter((item) -> !v.equals(item)).collect(Collectors.toList())); + } } int size() { - return size.get(); + return list.get().size(); } } @@ -119,8 +120,7 @@ public class ContentPolicy extends SlobrokPolicy { // Try to use list of random targets, if at least X % of the nodes are up while (100 * targets.size() >= requiredUpPercentageToSendToKnownGoodNodes * targets.total) { - Integer distributor = targets.get(randomizer.nextInt(targets.size())); - if (distributor == null) continue; + Integer distributor = targets.get(randomizer); String targetSpec = getTargetSpec(distributor, context); if (targetSpec != null) { context.trace(3, "Sending to random node seen up in cluster state"); |