diff options
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 5355da563c2..75de698cc5e 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -316,7 +316,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { private final HostFetcher hostFetcher; private final Distribution distribution; private final InstabilityChecker persistentFailureChecker; - private ClusterState cachedClusterState = null; + private AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null); private int oldClusterVersionGottenCount = 0; private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection @@ -335,6 +335,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { String getTargetSpec(RoutingContext context, BucketId bucketId) { String sendRandomReason = null; + ClusterState cachedClusterState = safeCachedClusterState.get(); MessageContext messageContext = new MessageContext(cachedClusterState); context.setContext(messageContext); if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. @@ -369,7 +370,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { return null; } catch (Distribution.NoDistributorsAvailableException e) { log.log(LogLevel.DEBUG, "No distributors available; clearing cluster state"); - cachedClusterState = null; + safeCachedClusterState.set(null); sendRandomReason = "No distributors available. Sending to random distributor."; } } else { @@ -409,13 +410,14 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { - cachedClusterState = newState; + safeCachedClusterState.set(newState); if (newState.getClusterState().equals(State.UP)) { hostFetcher.updateValidTargets(newState); } } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { - cachedClusterState = null; + safeCachedClusterState.set(null); } else if (context.calculatedDistributor != null) { persistentFailureChecker.addFailure(context.calculatedDistributor); } @@ -452,10 +454,11 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { if (++oldClusterVersionGottenCount >= maxOldClusterVersionBeforeSendingRandom) { oldClusterVersionGottenCount = 0; - cachedClusterState = null; + safeCachedClusterState.set(null); } } } @@ -476,6 +479,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { if (!reply.getTrace().shouldTrace(1)) { return; } + ClusterState cachedClusterState = safeCachedClusterState.get(); if (cachedClusterState == null) { reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); } else if (newState.getVersion() == cachedClusterState.getVersion()) { |