aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 08:40:42 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 08:40:42 +0200
commit5c2cd3bd5a770a14e995a3ae36959a76053e2968 (patch)
tree8e1885a743fb2396e341a78cdccd37c0ee532951 /documentapi
parent393d7fed5fa13716e96a8a3235b4d8b11a2daa5f (diff)
Make cachedClusterState thread safe.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java14
1 files changed, 9 insertions, 5 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
index 5355da563c2..75de698cc5e 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -316,7 +316,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
private final HostFetcher hostFetcher;
private final Distribution distribution;
private final InstabilityChecker persistentFailureChecker;
- private ClusterState cachedClusterState = null;
+ private AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null);
private int oldClusterVersionGottenCount = 0;
private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
@@ -335,6 +335,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
String getTargetSpec(RoutingContext context, BucketId bucketId) {
String sendRandomReason = null;
+ ClusterState cachedClusterState = safeCachedClusterState.get();
MessageContext messageContext = new MessageContext(cachedClusterState);
context.setContext(messageContext);
if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
@@ -369,7 +370,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
return null;
} catch (Distribution.NoDistributorsAvailableException e) {
log.log(LogLevel.DEBUG, "No distributors available; clearing cluster state");
- cachedClusterState = null;
+ safeCachedClusterState.set(null);
sendRandomReason = "No distributors available. Sending to random distributor.";
}
} else {
@@ -409,13 +410,14 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
- cachedClusterState = newState;
+ safeCachedClusterState.set(newState);
if (newState.getClusterState().equals(State.UP)) {
hostFetcher.updateValidTargets(newState);
}
} else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
- cachedClusterState = null;
+ safeCachedClusterState.set(null);
} else if (context.calculatedDistributor != null) {
persistentFailureChecker.addFailure(context.calculatedDistributor);
}
@@ -452,10 +454,11 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
if (++oldClusterVersionGottenCount >= maxOldClusterVersionBeforeSendingRandom) {
oldClusterVersionGottenCount = 0;
- cachedClusterState = null;
+ safeCachedClusterState.set(null);
}
}
}
@@ -476,6 +479,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
if (!reply.getTrace().shouldTrace(1)) {
return;
}
+ ClusterState cachedClusterState = safeCachedClusterState.get();
if (cachedClusterState == null) {
reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
} else if (newState.getVersion() == cachedClusterState.getVersion()) {