summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 12:03:36 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 12:03:36 +0200
commit04bed07efb4b196047681f91f79bd531966cd37c (patch)
tree19763f9b78160859cacbae4a422ef7035625f441 /documentapi
parent8750aaecebe8c7a87032e6a747d073cc5d184262 (diff)
And then some more final
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java52
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java2
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java7
3 files changed, 36 insertions, 25 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
index 9000fff91da..60589432ea7 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -74,7 +74,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
/** Helper class to match a host pattern with node to use. */
public abstract static class HostFetcher {
- private class Targets {
+ private static class Targets {
private final List<Integer> list;
private final int total;
Targets() {
@@ -123,7 +123,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
/** Host fetcher using a slobrok mirror to find the hosts. */
public static class SlobrokHostFetcher extends HostFetcher {
private final SlobrokHostPatternGenerator patternGenerator;
- final ExternalSlobrokPolicy policy;
+ private final ExternalSlobrokPolicy policy;
SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy, int percent) {
super(percent);
@@ -231,7 +231,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set");
}
- public String getDistributionConfigId() {
+ String getDistributionConfigId() {
return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId);
}
public String getClusterName() {
@@ -303,8 +303,8 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
public static class DistributorSelectionLogic {
/** Class that tracks a failure of a given type per node. */
static class InstabilityChecker {
- private List<Integer> nodeFailures = new ArrayList<>();
- private int failureLimit;
+ private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
+ private final int failureLimit;
InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; }
@@ -324,10 +324,19 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
/** Message context class. Contains data we want to inspect about a request at reply time. */
private static class MessageContext {
- Integer calculatedDistributor;
- ClusterState usedState;
+ final Integer calculatedDistributor;
+ final ClusterState usedState;
- MessageContext(ClusterState usedState) { this.usedState = usedState; }
+ MessageContext() {
+ this(null, null);
+ }
+ MessageContext(ClusterState usedState) {
+ this(usedState, null);
+ }
+ MessageContext(ClusterState usedState, Integer calculatedDistributor) {
+ this.calculatedDistributor = calculatedDistributor;
+ this.usedState = usedState;
+ }
public String toString() {
return "Context(Distributor " + calculatedDistributor +
@@ -357,8 +366,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
String getTargetSpec(RoutingContext context, BucketId bucketId) {
String sendRandomReason = null;
ClusterState cachedClusterState = safeCachedClusterState.get();
- MessageContext messageContext = new MessageContext(cachedClusterState);
- context.setContext(messageContext);
+
if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
try{
Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
@@ -369,19 +377,20 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
// If we have found a target, and the target exists in slobrok, send to it.
if (target != null) {
- messageContext.calculatedDistributor = target;
+ context.setContext(new MessageContext(cachedClusterState, target));
String targetSpec = hostFetcher.getTargetSpec(target, context);
if (targetSpec != null) {
if (context.shouldTrace(1)) {
- context.trace(1, "Using distributor " + messageContext.calculatedDistributor + " for " +
+ context.trace(1, "Using distributor " + target + " for " +
bucketId + " as our state version is " + cachedClusterState.getVersion());
}
- messageContext.usedState = cachedClusterState;
return targetSpec;
} else {
- sendRandomReason = "Want to use distributor " + messageContext.calculatedDistributor + " but it is not in slobrok. Sending to random.";
+ sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
log.log(LogLevel.DEBUG, "Target distributor is not in slobrok");
}
+ } else {
+ context.setContext(new MessageContext(cachedClusterState));
}
} catch (Distribution.TooFewBucketBitsInUseException e) {
Reply reply = new WrongDistributionReply(cachedClusterState.toString(true));
@@ -395,6 +404,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
sendRandomReason = "No distributors available. Sending to random distributor.";
}
} else {
+ context.setContext(new MessageContext(null));
sendRandomReason = "No cluster state cached. Sending to random distributor.";
}
if (context.shouldTrace(1)) {
@@ -524,8 +534,8 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
- private DistributorSelectionLogic distributorSelectionLogic = null;
- private Parameters parameters;
+ private final AtomicReference<DistributorSelectionLogic> distributorSelectionLogic = new AtomicReference<>();
+ private final Parameters parameters;
/** Constructor used in production. */
public StoragePolicy(String param) {
@@ -545,7 +555,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
@Override
public void init() {
super.init();
- this.distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
+ this.distributorSelectionLogic.set(new DistributorSelectionLogic(parameters, this));
}
@Override
@@ -557,7 +567,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
if (context.hasReply()) return;
- String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
+ String targetSpec = distributorSelectionLogic.get().getTargetSpec(context, bucketId);
if (context.hasReply()) return;
if (targetSpec != null) {
Route route = new Route(context.getRoute());
@@ -580,9 +590,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
if (reply instanceof WrongDistributionReply) {
- distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
+ distributorSelectionLogic.get().handleWrongDistribution((WrongDistributionReply) reply, context);
} else if (reply.hasErrors()) {
- distributorSelectionLogic.handleErrorReply(reply, context.getContext());
+ distributorSelectionLogic.get().handleErrorReply(reply, context.getContext());
} else if (reply instanceof WriteDocumentReply) {
if (context.shouldTrace(9)) {
context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
@@ -593,6 +603,6 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
@Override
public void destroy() {
- distributorSelectionLogic.destroy();
+ distributorSelectionLogic.get().destroy();
}
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
index 4413b657739..20b3c2fb6b4 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
@@ -50,7 +50,7 @@ public class TargetCachingSlobrokHostFetcherTest {
ExternalSlobrokPolicy mockSlobrokPolicy = mock(ExternalSlobrokPolicy.class);
IMirror mockMirror = mock(IMirror.class);
StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo");
- StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy);
+ StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60);
RoutingContext routingContext = mock(RoutingContext.class);
Fixture() {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
index b2a137abc7d..304630861cf 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
@@ -110,6 +110,7 @@ public abstract class StoragePolicyTestEnvironment {
private Integer avoidPickingAtRandom = null;
public TestHostFetcher(String clusterName, Set<Integer> nodes) {
+ super(60);
this.clusterName = clusterName;
this.nodes = nodes;
}
@@ -154,7 +155,7 @@ public abstract class StoragePolicyTestEnvironment {
}
@Override
- public StoragePolicy.HostFetcher createHostFetcher(ExternalSlobrokPolicy policy) { return hostFetcher; }
+ public StoragePolicy.HostFetcher createHostFetcher(ExternalSlobrokPolicy policy, int percent) { return hostFetcher; }
@Override
public Distribution createDistribution(ExternalSlobrokPolicy policy) { return distribution; }
@@ -170,13 +171,13 @@ public abstract class StoragePolicyTestEnvironment {
}
public DocumentProtocolRoutingPolicy createPolicy(String parameters) {
parameterInstances.addLast(new TestParameters(parameters, nodes));
- ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null)).setAvoidPickingAtRandom(avoidPickingAtRandom);
+ ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom);
return new StoragePolicy(parameterInstances.getLast(), AsyncInitializationPolicy.parse(parameters));
}
public void avoidPickingAtRandom(Integer distributor) {
avoidPickingAtRandom = distributor;
for (TestParameters params : parameterInstances) {
- ((TestHostFetcher) params.createHostFetcher(null)).setAvoidPickingAtRandom(avoidPickingAtRandom);
+ ((TestHostFetcher) params.createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom);
}
}
public TestParameters getLastParameters() { return parameterInstances.getLast(); }