diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-26 10:34:24 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-26 10:34:24 +0200 |
commit | 7466ce4cb8aa055137b79cfc1c719d68527b0164 (patch) | |
tree | 874b33db5176ae2a45f9e71ea390d95eeba571f4 /documentapi/src | |
parent | 5a89596b1b1b6d7dc20afa33cb1d4b1388cb5fa7 (diff) |
And then there were more.....
Diffstat (limited to 'documentapi/src')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 2d6d47cf8ab..81af29202d1 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -53,26 +53,31 @@ public class StoragePolicy extends ExternalSlobrokPolicy { /** This class merely generates slobrok a host pattern for a given distributor. */ public static class SlobrokHostPatternGenerator { - private final String clusterName; - SlobrokHostPatternGenerator(String clusterName) { this.clusterName = clusterName; } + private final String base; + private final String all; + SlobrokHostPatternGenerator(String clusterName) { + base = "storage/cluster." + clusterName + "/distributor/"; + all = base + "*/default"; + + } /** * Find host pattern of the hosts that are valid targets for this request. * @param distributor Set to -1 if any distributor is valid target. */ public String getDistributorHostPattern(Integer distributor) { - return "storage/cluster." + clusterName + "/distributor/" + (distributor == null ? "*" : distributor) + "/default"; + return (distributor == null) ? all : (base + distributor + "/default"); } } /** Helper class to match a host pattern with node to use. */ public abstract static class HostFetcher { - private int requiredUpPercentageToSendToKnownGoodNodes = 60; + private AtomicInteger safeRequiredUpPercentageToSendToKnownGoodNodes = new AtomicInteger(60); private AtomicReference<List<Integer>> safeValidRandomTargets = new AtomicReference<>(new CopyOnWriteArrayList<>()); - private int totalTargets = 1; + private AtomicInteger safeTotalTargets = new AtomicInteger(1); protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. - void setRequiredUpPercentageToSendToKnownGoodNodes(int percent) { this.requiredUpPercentageToSendToKnownGoodNodes = percent; } + void setRequiredUpPercentageToSendToKnownGoodNodes(int percent) { this.safeRequiredUpPercentageToSendToKnownGoodNodes.set(percent); } void updateValidTargets(ClusterState state) { List<Integer> validRandomTargets = new ArrayList<>(); @@ -80,12 +85,14 @@ public class StoragePolicy extends ExternalSlobrokPolicy { if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i); } this.safeValidRandomTargets.set(new CopyOnWriteArrayList<>(validRandomTargets)); - this.totalTargets = state.getNodeCount(NodeType.DISTRIBUTOR); + safeTotalTargets.set(state.getNodeCount(NodeType.DISTRIBUTOR)); } public abstract String getTargetSpec(Integer distributor, RoutingContext context); String getRandomTargetSpec(RoutingContext context) { List<Integer> validRandomTargets = safeValidRandomTargets.get(); // Try to use list of random targets, if at least X % of the nodes are up + int totalTargets = safeTotalTargets.get(); + int requiredUpPercentageToSendToKnownGoodNodes = safeRequiredUpPercentageToSendToKnownGoodNodes.get(); while (100 * validRandomTargets.size() / totalTargets >= requiredUpPercentageToSendToKnownGoodNodes) { int randIndex = randomizer.nextInt(validRandomTargets.size()); String targetSpec = getTargetSpec(validRandomTargets.get(randIndex), context); @@ -104,7 +111,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { /** Host fetcher using a slobrok mirror to find the hosts. */ public static class SlobrokHostFetcher extends HostFetcher { private final SlobrokHostPatternGenerator patternGenerator; - ExternalSlobrokPolicy policy; + final ExternalSlobrokPolicy policy; SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) { this.patternGenerator = patternGenerator; @@ -119,6 +126,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { public IMirror getMirror(RoutingContext context) { return context.getMirror(); } + @Override public String getTargetSpec(Integer distributor, RoutingContext context) { List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context); if (arr.isEmpty()) return null; @@ -159,7 +167,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } } - private volatile GenerationCache generationCache = null; + private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null); TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) { super(patternGenerator, policy); @@ -167,13 +175,13 @@ public class StoragePolicy extends ExternalSlobrokPolicy { @Override public String getTargetSpec(Integer distributor, RoutingContext context) { - GenerationCache cache = generationCache; + GenerationCache cache = generationCache.get(); int currentGeneration = getMirror(context).updates(); // The below code might race with other threads during a generation change. That is OK, as the cache // is thread safe and will quickly converge to a stable state for the new generation. if (cache == null || currentGeneration != cache.generation()) { cache = new GenerationCache(currentGeneration); - generationCache = cache; + generationCache.set(cache); } if (distributor != null) { return cachingGetTargetSpec(distributor, context, cache); |