diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-26 12:03:36 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-26 12:03:36 +0200 |
commit | 04bed07efb4b196047681f91f79bd531966cd37c (patch) | |
tree | 19763f9b78160859cacbae4a422ef7035625f441 /documentapi | |
parent | 8750aaecebe8c7a87032e6a747d073cc5d184262 (diff) |
And then some more final
Diffstat (limited to 'documentapi')
3 files changed, 36 insertions, 25 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 9000fff91da..60589432ea7 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -74,7 +74,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { /** Helper class to match a host pattern with node to use. */ public abstract static class HostFetcher { - private class Targets { + private static class Targets { private final List<Integer> list; private final int total; Targets() { @@ -123,7 +123,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { /** Host fetcher using a slobrok mirror to find the hosts. */ public static class SlobrokHostFetcher extends HostFetcher { private final SlobrokHostPatternGenerator patternGenerator; - final ExternalSlobrokPolicy policy; + private final ExternalSlobrokPolicy policy; SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy, int percent) { super(percent); @@ -231,7 +231,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set"); } - public String getDistributionConfigId() { + String getDistributionConfigId() { return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId); } public String getClusterName() { @@ -303,8 +303,8 @@ public class StoragePolicy extends ExternalSlobrokPolicy { public static class DistributorSelectionLogic { /** Class that tracks a failure of a given type per node. */ static class InstabilityChecker { - private List<Integer> nodeFailures = new ArrayList<>(); - private int failureLimit; + private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>(); + private final int failureLimit; InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } @@ -324,10 +324,19 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } /** Message context class. Contains data we want to inspect about a request at reply time. */ private static class MessageContext { - Integer calculatedDistributor; - ClusterState usedState; + final Integer calculatedDistributor; + final ClusterState usedState; - MessageContext(ClusterState usedState) { this.usedState = usedState; } + MessageContext() { + this(null, null); + } + MessageContext(ClusterState usedState) { + this(usedState, null); + } + MessageContext(ClusterState usedState, Integer calculatedDistributor) { + this.calculatedDistributor = calculatedDistributor; + this.usedState = usedState; + } public String toString() { return "Context(Distributor " + calculatedDistributor + @@ -357,8 +366,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { String getTargetSpec(RoutingContext context, BucketId bucketId) { String sendRandomReason = null; ClusterState cachedClusterState = safeCachedClusterState.get(); - MessageContext messageContext = new MessageContext(cachedClusterState); - context.setContext(messageContext); + if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. try{ Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); @@ -369,19 +377,20 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } // If we have found a target, and the target exists in slobrok, send to it. if (target != null) { - messageContext.calculatedDistributor = target; + context.setContext(new MessageContext(cachedClusterState, target)); String targetSpec = hostFetcher.getTargetSpec(target, context); if (targetSpec != null) { if (context.shouldTrace(1)) { - context.trace(1, "Using distributor " + messageContext.calculatedDistributor + " for " + + context.trace(1, "Using distributor " + target + " for " + bucketId + " as our state version is " + cachedClusterState.getVersion()); } - messageContext.usedState = cachedClusterState; return targetSpec; } else { - sendRandomReason = "Want to use distributor " + messageContext.calculatedDistributor + " but it is not in slobrok. Sending to random."; + sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random."; log.log(LogLevel.DEBUG, "Target distributor is not in slobrok"); } + } else { + context.setContext(new MessageContext(cachedClusterState)); } } catch (Distribution.TooFewBucketBitsInUseException e) { Reply reply = new WrongDistributionReply(cachedClusterState.toString(true)); @@ -395,6 +404,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { sendRandomReason = "No distributors available. Sending to random distributor."; } } else { + context.setContext(new MessageContext(null)); sendRandomReason = "No cluster state cached. Sending to random distributor."; } if (context.shouldTrace(1)) { @@ -524,8 +534,8 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); - private DistributorSelectionLogic distributorSelectionLogic = null; - private Parameters parameters; + private final AtomicReference<DistributorSelectionLogic> distributorSelectionLogic = new AtomicReference<>(); + private final Parameters parameters; /** Constructor used in production. */ public StoragePolicy(String param) { @@ -545,7 +555,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { @Override public void init() { super.init(); - this.distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); + this.distributorSelectionLogic.set(new DistributorSelectionLogic(parameters, this)); } @Override @@ -557,7 +567,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); if (context.hasReply()) return; - String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); + String targetSpec = distributorSelectionLogic.get().getTargetSpec(context, bucketId); if (context.hasReply()) return; if (targetSpec != null) { Route route = new Route(context.getRoute()); @@ -580,9 +590,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } if (reply instanceof WrongDistributionReply) { - distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); + distributorSelectionLogic.get().handleWrongDistribution((WrongDistributionReply) reply, context); } else if (reply.hasErrors()) { - distributorSelectionLogic.handleErrorReply(reply, context.getContext()); + distributorSelectionLogic.get().handleErrorReply(reply, context.getContext()); } else if (reply instanceof WriteDocumentReply) { if (context.shouldTrace(9)) { context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); @@ -593,6 +603,6 @@ public class StoragePolicy extends ExternalSlobrokPolicy { @Override public void destroy() { - distributorSelectionLogic.destroy(); + distributorSelectionLogic.get().destroy(); } } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java index 4413b657739..20b3c2fb6b4 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java @@ -50,7 +50,7 @@ public class TargetCachingSlobrokHostFetcherTest { ExternalSlobrokPolicy mockSlobrokPolicy = mock(ExternalSlobrokPolicy.class); IMirror mockMirror = mock(IMirror.class); StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo"); - StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy); + StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); RoutingContext routingContext = mock(RoutingContext.class); Fixture() { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java index b2a137abc7d..304630861cf 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java @@ -110,6 +110,7 @@ public abstract class StoragePolicyTestEnvironment { private Integer avoidPickingAtRandom = null; public TestHostFetcher(String clusterName, Set<Integer> nodes) { + super(60); this.clusterName = clusterName; this.nodes = nodes; } @@ -154,7 +155,7 @@ public abstract class StoragePolicyTestEnvironment { } @Override - public StoragePolicy.HostFetcher createHostFetcher(ExternalSlobrokPolicy policy) { return hostFetcher; } + public StoragePolicy.HostFetcher createHostFetcher(ExternalSlobrokPolicy policy, int percent) { return hostFetcher; } @Override public Distribution createDistribution(ExternalSlobrokPolicy policy) { return distribution; } @@ -170,13 +171,13 @@ public abstract class StoragePolicyTestEnvironment { } public DocumentProtocolRoutingPolicy createPolicy(String parameters) { parameterInstances.addLast(new TestParameters(parameters, nodes)); - ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null)).setAvoidPickingAtRandom(avoidPickingAtRandom); + ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); return new StoragePolicy(parameterInstances.getLast(), AsyncInitializationPolicy.parse(parameters)); } public void avoidPickingAtRandom(Integer distributor) { avoidPickingAtRandom = distributor; for (TestParameters params : parameterInstances) { - ((TestHostFetcher) params.createHostFetcher(null)).setAvoidPickingAtRandom(avoidPickingAtRandom); + ((TestHostFetcher) params.createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); } } public TestParameters getLastParameters() { return parameterInstances.getLast(); } |