From e2932ad73d6465a60e4621e9e074fe1e385ce7f2 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 12 Jan 2017 11:09:50 +0100 Subject: Refactor handling of WrongDistributionReply. No logic changes. --- .../messagebus/protocol/StoragePolicy.java | 149 ++++++++++++--------- 1 file changed, 87 insertions(+), 62 deletions(-) (limited to 'documentapi') diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 7f8121c2138..0da97dbaa61 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -20,6 +20,7 @@ import com.yahoo.vdslib.state.State; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.logging.Logger; @@ -73,7 +74,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } public abstract String getTargetSpec(Integer distributor, RoutingContext context); public String getRandomTargetSpec(RoutingContext context) { - // Try to use list of random targets, if at least X % of the nodes are up + // Try to use list of random targets, if at least X % of the nodes are up while (100 * validRandomTargets.size() / totalTargets >= requiredUpPercentageToSendToKnownGoodNodes) { int randIndex = randomizer.nextInt(validRandomTargets.size()); String targetSpec = getTargetSpec(validRandomTargets.get(randIndex), context); @@ -269,12 +270,12 @@ public class StoragePolicy extends ExternalSlobrokPolicy { if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. try{ Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); - // If we have had too many failures towards existing node, reset failure count and send to random + // If we have had too many failures towards existing node, reset failure count and send to random if (persistentFailureChecker.tooManyFailures(target)) { sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state."; target = null; } - // If we have found a target, and the target exist in slobrok, send to it. + // If we have found a target, and the target exists in slobrok, send to it. if (target != null) { messageContext.calculatedDistributor = target; String targetSpec = hostFetcher.getTargetSpec(target, context); @@ -310,21 +311,86 @@ public class StoragePolicy extends ExternalSlobrokPolicy { return hostFetcher.getRandomTargetSpec(context); } - public void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { - MessageContext context = (MessageContext) routingContext.getContext(); - ClusterState newState; + private static Optional clusterStateFromReply(final WrongDistributionReply reply) { try { - newState = new ClusterState(reply.getSystemState()); + return Optional.of(new ClusterState(reply.getSystemState())); } catch (Exception e) { reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState()); + return Optional.empty(); + } + } + + public void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { + final MessageContext context = (MessageContext) routingContext.getContext(); + final Optional replyState = clusterStateFromReply(reply); + if (!replyState.isPresent()) { return; } + final ClusterState newState = replyState.get(); + resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); + markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); + + if ((context.calculatedDistributor == null)) { + traceReplyFromRandomDistributor(reply, newState); + } else { + traceReplyFromSpecificDistributor(reply, context, newState); + } + updateCachedRoutingStateFromWrongDistribution(context, newState); + } + + private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { + if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { + cachedClusterState = newState; + if (newState.getClusterState().equals(State.UP)) { + hostFetcher.updateValidTargets(newState); + } + } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { + cachedClusterState = null; + } else if (context.calculatedDistributor != null) { + persistentFailureChecker.addFailure(context.calculatedDistributor); + } + } + + private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) { + if (context.usedState == null) { + String msg = "Used state must be set as distributor is calculated. Bug."; + reply.getTrace().trace(1, msg); + log.log(LogLevel.ERROR, msg); + } else if (newState.getVersion() == context.usedState.getVersion()) { + String msg = "Message sent to distributor " + context.calculatedDistributor + + " retrieved cluster state version " + newState.getVersion() + + " which was the state we used to calculate distributor as target last time."; + reply.getTrace().trace(1, msg); + // Client load can be rejected towards distributors even with a matching cluster state version. + // This usually happens during a node fail-over transition, where the target distributor will + // reject an operation bound to a particular bucket if it does not own the bucket in _both_ + // the current and the next (transition target) state. Since it can happen during normal operation + // and will happen per client operation, we keep this as debug level to prevent spamming the logs. + log.log(LogLevel.DEBUG, msg); + } else if (newState.getVersion() > context.usedState.getVersion()) { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " updated cluster state from version " + context.usedState.getVersion() + + " to " + newState.getVersion()); + } + } else { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " returned older cluster state version " + newState.getVersion()); + } + } + } + + private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { if (++oldClusterVersionGottenCount >= maxOldClusterVersionBeforeSendingRandom) { oldClusterVersionGottenCount = 0; cachedClusterState = null; } } + } + + private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) { if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) { if (reply.getRetryDelay() <= 0.0) { reply.setRetryDelay(-1); @@ -334,64 +400,23 @@ public class StoragePolicy extends ExternalSlobrokPolicy { reply.setRetryDelay(0); } } - if (context.calculatedDistributor == null) { - if (cachedClusterState == null) { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); - } - } else if (newState.getVersion() == cachedClusterState.getVersion()) { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); - } - } else if (newState.getVersion() > cachedClusterState.getVersion()) { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); - } - } else { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); - } - } - } else { - if (context.usedState == null) { - String msg = "Used state must be set as distributor is calculated. Bug."; - reply.getTrace().trace(1, msg); - log.log(LogLevel.ERROR, msg); - } else if (newState.getVersion() == context.usedState.getVersion()) { - String msg = "Message sent to distributor " + context.calculatedDistributor + - " retrieved cluster state version " + newState.getVersion() + - " which was the state we used to calculate distributor as target last time."; - reply.getTrace().trace(1, msg); - // Client load can be rejected towards distributors even with a matching cluster state version. - // This usually happens during a node fail-over transition, where the target distributor will - // reject an operation bound to a particular bucket if it does not own the bucket in _both_ - // the current and the next (transition target) state. Since it can happen during normal operation - // and will happen per client operation, we keep this as debug level to prevent spamming the logs. - log.log(LogLevel.DEBUG, msg); - } else if (newState.getVersion() > context.usedState.getVersion()) { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " updated cluster state from version " + context.usedState.getVersion() + - " to " + newState.getVersion()); - } - } else { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " returned older cluster state version " + newState.getVersion()); - } - } + } + + private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) { + if (!reply.getTrace().shouldTrace(1)) { + return; } - if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { - cachedClusterState = newState; - if (newState.getClusterState().equals(State.UP)) { - hostFetcher.updateValidTargets(newState); - } - } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { - cachedClusterState = null; - } else if (context.calculatedDistributor != null) { - persistentFailureChecker.addFailure(context.calculatedDistributor); + if (cachedClusterState == null) { + reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); + } else if (newState.getVersion() == cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); + } else if (newState.getVersion() > cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); + } else { + reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); } } + public void handleErrorReply(Reply reply, Object untypedContext) { MessageContext messageContext = (MessageContext) untypedContext; if (messageContext.calculatedDistributor != null) { -- cgit v1.2.3 From ab968a8b15674d569acb9b3faecd053489745ebd Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 12 Jan 2017 11:13:38 +0100 Subject: Remove unnecessary double parenthesis. --- .../java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'documentapi') diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 0da97dbaa61..de0ce196678 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -330,7 +330,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); - if ((context.calculatedDistributor == null)) { + if (context.calculatedDistributor == null) { traceReplyFromRandomDistributor(reply, newState); } else { traceReplyFromSpecificDistributor(reply, context, newState); -- cgit v1.2.3