summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-01-23 10:55:38 +0100
committerGitHub <noreply@github.com>2017-01-23 10:55:38 +0100
commit9370337f5571137388f052562320d80bdc9f7b40 (patch)
treecffdd7e2a712e88c32b26537e1700093916c7cd9 /documentapi
parenta7cfab89184c64472cf088ce572210000ad5edec (diff)
parentab968a8b15674d569acb9b3faecd053489745ebd (diff)
Merge pull request #1494 from yahoo/vekterli/refactor-storagepolicy-wrong-distribution-reply-handling
Refactor StoragePolicy handling of WrongDistributionReply. No logic changes.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java149
1 files changed, 87 insertions, 62 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
index 7f8121c2138..de0ce196678 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -20,6 +20,7 @@ import com.yahoo.vdslib.state.State;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.logging.Logger;
@@ -73,7 +74,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
public abstract String getTargetSpec(Integer distributor, RoutingContext context);
public String getRandomTargetSpec(RoutingContext context) {
- // Try to use list of random targets, if at least X % of the nodes are up
+ // Try to use list of random targets, if at least X % of the nodes are up
while (100 * validRandomTargets.size() / totalTargets >= requiredUpPercentageToSendToKnownGoodNodes) {
int randIndex = randomizer.nextInt(validRandomTargets.size());
String targetSpec = getTargetSpec(validRandomTargets.get(randIndex), context);
@@ -269,12 +270,12 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
try{
Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
- // If we have had too many failures towards existing node, reset failure count and send to random
+ // If we have had too many failures towards existing node, reset failure count and send to random
if (persistentFailureChecker.tooManyFailures(target)) {
sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
target = null;
}
- // If we have found a target, and the target exist in slobrok, send to it.
+ // If we have found a target, and the target exists in slobrok, send to it.
if (target != null) {
messageContext.calculatedDistributor = target;
String targetSpec = hostFetcher.getTargetSpec(target, context);
@@ -310,21 +311,86 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
return hostFetcher.getRandomTargetSpec(context);
}
- public void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
- MessageContext context = (MessageContext) routingContext.getContext();
- ClusterState newState;
+ private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) {
try {
- newState = new ClusterState(reply.getSystemState());
+ return Optional.of(new ClusterState(reply.getSystemState()));
} catch (Exception e) {
reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
+ return Optional.empty();
+ }
+ }
+
+ public void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
+ final MessageContext context = (MessageContext) routingContext.getContext();
+ final Optional<ClusterState> replyState = clusterStateFromReply(reply);
+ if (!replyState.isPresent()) {
return;
}
+ final ClusterState newState = replyState.get();
+ resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
+ markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);
+
+ if (context.calculatedDistributor == null) {
+ traceReplyFromRandomDistributor(reply, newState);
+ } else {
+ traceReplyFromSpecificDistributor(reply, context, newState);
+ }
+ updateCachedRoutingStateFromWrongDistribution(context, newState);
+ }
+
+ private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
+ if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
+ cachedClusterState = newState;
+ if (newState.getClusterState().equals(State.UP)) {
+ hostFetcher.updateValidTargets(newState);
+ }
+ } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
+ cachedClusterState = null;
+ } else if (context.calculatedDistributor != null) {
+ persistentFailureChecker.addFailure(context.calculatedDistributor);
+ }
+ }
+
+ private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
+ if (context.usedState == null) {
+ String msg = "Used state must be set as distributor is calculated. Bug.";
+ reply.getTrace().trace(1, msg);
+ log.log(LogLevel.ERROR, msg);
+ } else if (newState.getVersion() == context.usedState.getVersion()) {
+ String msg = "Message sent to distributor " + context.calculatedDistributor +
+ " retrieved cluster state version " + newState.getVersion() +
+ " which was the state we used to calculate distributor as target last time.";
+ reply.getTrace().trace(1, msg);
+ // Client load can be rejected towards distributors even with a matching cluster state version.
+ // This usually happens during a node fail-over transition, where the target distributor will
+ // reject an operation bound to a particular bucket if it does not own the bucket in _both_
+ // the current and the next (transition target) state. Since it can happen during normal operation
+ // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
+ log.log(LogLevel.DEBUG, msg);
+ } else if (newState.getVersion() > context.usedState.getVersion()) {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " updated cluster state from version " + context.usedState.getVersion() +
+ " to " + newState.getVersion());
+ }
+ } else {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " returned older cluster state version " + newState.getVersion());
+ }
+ }
+ }
+
+ private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
if (++oldClusterVersionGottenCount >= maxOldClusterVersionBeforeSendingRandom) {
oldClusterVersionGottenCount = 0;
cachedClusterState = null;
}
}
+ }
+
+ private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
if (reply.getRetryDelay() <= 0.0) {
reply.setRetryDelay(-1);
@@ -334,64 +400,23 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
reply.setRetryDelay(0);
}
}
- if (context.calculatedDistributor == null) {
- if (cachedClusterState == null) {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
- }
- } else if (newState.getVersion() == cachedClusterState.getVersion()) {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
- }
- } else if (newState.getVersion() > cachedClusterState.getVersion()) {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
- }
- } else {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
- }
- }
- } else {
- if (context.usedState == null) {
- String msg = "Used state must be set as distributor is calculated. Bug.";
- reply.getTrace().trace(1, msg);
- log.log(LogLevel.ERROR, msg);
- } else if (newState.getVersion() == context.usedState.getVersion()) {
- String msg = "Message sent to distributor " + context.calculatedDistributor +
- " retrieved cluster state version " + newState.getVersion() +
- " which was the state we used to calculate distributor as target last time.";
- reply.getTrace().trace(1, msg);
- // Client load can be rejected towards distributors even with a matching cluster state version.
- // This usually happens during a node fail-over transition, where the target distributor will
- // reject an operation bound to a particular bucket if it does not own the bucket in _both_
- // the current and the next (transition target) state. Since it can happen during normal operation
- // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
- log.log(LogLevel.DEBUG, msg);
- } else if (newState.getVersion() > context.usedState.getVersion()) {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
- " updated cluster state from version " + context.usedState.getVersion() +
- " to " + newState.getVersion());
- }
- } else {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
- " returned older cluster state version " + newState.getVersion());
- }
- }
+ }
+
+ private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
+ if (!reply.getTrace().shouldTrace(1)) {
+ return;
}
- if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
- cachedClusterState = newState;
- if (newState.getClusterState().equals(State.UP)) {
- hostFetcher.updateValidTargets(newState);
- }
- } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
- cachedClusterState = null;
- } else if (context.calculatedDistributor != null) {
- persistentFailureChecker.addFailure(context.calculatedDistributor);
+ if (cachedClusterState == null) {
+ reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
+ } else if (newState.getVersion() == cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
+ } else if (newState.getVersion() > cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
+ } else {
+ reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
}
}
+
public void handleErrorReply(Reply reply, Object untypedContext) {
MessageContext messageContext = (MessageContext) untypedContext;
if (messageContext.calculatedDistributor != null) {