summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 10:34:24 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 10:34:24 +0200
commit7466ce4cb8aa055137b79cfc1c719d68527b0164 (patch)
tree874b33db5176ae2a45f9e71ea390d95eeba571f4 /documentapi
parent5a89596b1b1b6d7dc20afa33cb1d4b1388cb5fa7 (diff)
And then there were more.....
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java30
1 files changed, 19 insertions, 11 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
index 2d6d47cf8ab..81af29202d1 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -53,26 +53,31 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
/** This class merely generates slobrok a host pattern for a given distributor. */
public static class SlobrokHostPatternGenerator {
- private final String clusterName;
- SlobrokHostPatternGenerator(String clusterName) { this.clusterName = clusterName; }
+ private final String base;
+ private final String all;
+ SlobrokHostPatternGenerator(String clusterName) {
+ base = "storage/cluster." + clusterName + "/distributor/";
+ all = base + "*/default";
+
+ }
/**
* Find host pattern of the hosts that are valid targets for this request.
* @param distributor Set to -1 if any distributor is valid target.
*/
public String getDistributorHostPattern(Integer distributor) {
- return "storage/cluster." + clusterName + "/distributor/" + (distributor == null ? "*" : distributor) + "/default";
+ return (distributor == null) ? all : (base + distributor + "/default");
}
}
/** Helper class to match a host pattern with node to use. */
public abstract static class HostFetcher {
- private int requiredUpPercentageToSendToKnownGoodNodes = 60;
+ private AtomicInteger safeRequiredUpPercentageToSendToKnownGoodNodes = new AtomicInteger(60);
private AtomicReference<List<Integer>> safeValidRandomTargets = new AtomicReference<>(new CopyOnWriteArrayList<>());
- private int totalTargets = 1;
+ private AtomicInteger safeTotalTargets = new AtomicInteger(1);
protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy.
- void setRequiredUpPercentageToSendToKnownGoodNodes(int percent) { this.requiredUpPercentageToSendToKnownGoodNodes = percent; }
+ void setRequiredUpPercentageToSendToKnownGoodNodes(int percent) { this.safeRequiredUpPercentageToSendToKnownGoodNodes.set(percent); }
void updateValidTargets(ClusterState state) {
List<Integer> validRandomTargets = new ArrayList<>();
@@ -80,12 +85,14 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i);
}
this.safeValidRandomTargets.set(new CopyOnWriteArrayList<>(validRandomTargets));
- this.totalTargets = state.getNodeCount(NodeType.DISTRIBUTOR);
+ safeTotalTargets.set(state.getNodeCount(NodeType.DISTRIBUTOR));
}
public abstract String getTargetSpec(Integer distributor, RoutingContext context);
String getRandomTargetSpec(RoutingContext context) {
List<Integer> validRandomTargets = safeValidRandomTargets.get();
// Try to use list of random targets, if at least X % of the nodes are up
+ int totalTargets = safeTotalTargets.get();
+ int requiredUpPercentageToSendToKnownGoodNodes = safeRequiredUpPercentageToSendToKnownGoodNodes.get();
while (100 * validRandomTargets.size() / totalTargets >= requiredUpPercentageToSendToKnownGoodNodes) {
int randIndex = randomizer.nextInt(validRandomTargets.size());
String targetSpec = getTargetSpec(validRandomTargets.get(randIndex), context);
@@ -104,7 +111,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
/** Host fetcher using a slobrok mirror to find the hosts. */
public static class SlobrokHostFetcher extends HostFetcher {
private final SlobrokHostPatternGenerator patternGenerator;
- ExternalSlobrokPolicy policy;
+ final ExternalSlobrokPolicy policy;
SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) {
this.patternGenerator = patternGenerator;
@@ -119,6 +126,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
public IMirror getMirror(RoutingContext context) { return context.getMirror(); }
+ @Override
public String getTargetSpec(Integer distributor, RoutingContext context) {
List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context);
if (arr.isEmpty()) return null;
@@ -159,7 +167,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
}
- private volatile GenerationCache generationCache = null;
+ private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);
TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) {
super(patternGenerator, policy);
@@ -167,13 +175,13 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
@Override
public String getTargetSpec(Integer distributor, RoutingContext context) {
- GenerationCache cache = generationCache;
+ GenerationCache cache = generationCache.get();
int currentGeneration = getMirror(context).updates();
// The below code might race with other threads during a generation change. That is OK, as the cache
// is thread safe and will quickly converge to a stable state for the new generation.
if (cache == null || currentGeneration != cache.generation()) {
cache = new GenerationCache(currentGeneration);
- generationCache = cache;
+ generationCache.set(cache);
}
if (distributor != null) {
return cachingGetTargetSpec(distributor, context, cache);