From 75b2e4c11ea6463c335f1c77dab3fdb5493e5600 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Tue, 5 Jan 2021 12:29:17 +0100 Subject: Revert "Jonmv/remove storage policy" --- .../messagebus/MessageBusVisitorSession.java | 2 +- .../messagebus/protocol/ContentPolicy.java | 602 +------------------- .../messagebus/protocol/DocumentProtocol.java | 2 +- .../protocol/RoutingPolicyFactories.java | 9 + .../messagebus/protocol/StoragePolicy.java | 615 +++++++++++++++++++++ .../TargetCachingSlobrokHostFetcherTest.java | 4 +- .../protocol/test/storagepolicy/BasicTests.java | 2 +- .../test/storagepolicy/ContentPolicyTest.java | 128 ----- .../ContentPolicyTestEnvironment.java | 238 -------- .../protocol/test/storagepolicy/Simulator.java | 6 +- .../test/storagepolicy/StoragePolicyTest.java | 128 +++++ .../StoragePolicyTestEnvironment.java | 238 ++++++++ documentapi/src/tests/policies/policies_test.cpp | 48 +- .../documentapi/messagebus/documentprotocol.cpp | 6 +- .../documentapi/messagebus/policies/CMakeLists.txt | 1 + .../messagebus/policies/contentpolicy.cpp | 244 +------- .../messagebus/policies/contentpolicy.h | 53 +- .../messagebus/policies/storagepolicy.cpp | 257 +++++++++ .../messagebus/policies/storagepolicy.h | 63 +++ .../messagebus/routingpolicyfactories.cpp | 18 +- .../messagebus/routingpolicyfactories.h | 4 + 21 files changed, 1387 insertions(+), 1281 deletions(-) create mode 100644 documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java delete mode 100644 documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java delete mode 100644 documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java create mode 100644 documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java create mode 100644 documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java create mode 100644 documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp create mode 100644 documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h (limited to 'documentapi/src') diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 982a1c50b85..257d491ea93 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -917,7 +917,7 @@ public class MessageBusVisitorSession implements VisitorSession { } } if (isErrorOfType(reply, DocumentProtocol.ERROR_WRONG_DISTRIBUTION)) { - handleWrongDistributionReply((WrongDistributionReply) reply); + handleWrongDistributionReply((WrongDistributionReply)reply); } else { if (shouldReportError(reply)) { reportVisitorError(message); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java index 2d78497456d..d6e20b9d57f 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java @@ -1,613 +1,43 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; -import com.yahoo.concurrent.CopyOnWriteHashMap; -import com.yahoo.document.BucketId; -import com.yahoo.document.BucketIdFactory; -import com.yahoo.jrt.slobrok.api.IMirror; -import com.yahoo.jrt.slobrok.api.Mirror; -import java.util.logging.Level; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.routing.Hop; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.routing.RoutingContext; -import com.yahoo.messagebus.routing.RoutingNodeIterator; -import com.yahoo.messagebus.routing.VerbatimDirective; -import com.yahoo.vdslib.distribution.Distribution; -import com.yahoo.vdslib.state.ClusterState; -import com.yahoo.vdslib.state.Node; -import com.yahoo.vdslib.state.NodeType; -import com.yahoo.vdslib.state.State; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; /** - * Routing policy to determine which distributor in a content cluster to send data to. - * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to. - * - * cluster=[clusterName] (Mandatory, determines the cluster name) - * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application) - * clusterconfigid=[id] (Optional, use given config id for distribution instead of default) - * - * @author Haakon Humberset + * Policy to talk to content clusters. */ -public class ContentPolicy extends SlobrokPolicy { - - private static final Logger log = Logger.getLogger(ContentPolicy.class.getName()); - public static final String owningBucketStates = "uim"; - private static final String upStates = "ui"; - - /** This class merely generates a slobrok host pattern for a given distributor. */ - static class SlobrokHostPatternGenerator { - - private final String base; - - SlobrokHostPatternGenerator(String clusterName) { - this.base = "storage/cluster." + clusterName + "/distributor/"; - } - - /** - * Find host pattern of the hosts that are valid targets for this request. - * - * @param distributor Set to null if any distributor is valid target. - */ - String getDistributorHostPattern(Integer distributor) { - return base + (distributor == null ? "*" : distributor) + "/default"; - } - - } - - /** Helper class to match a host pattern with node to use. */ - public abstract static class HostFetcher { - - private static class Targets { - private final List list; - private final int total; - Targets() { - this(Collections.emptyList(), 1); - } - Targets(List list, int total) { - this.list = list; - this.total = total; - } - } - - private final int requiredUpPercentageToSendToKnownGoodNodes; - private final AtomicReference validTargets = new AtomicReference<>(new Targets()); - protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. - - protected HostFetcher(int percent) { - requiredUpPercentageToSendToKnownGoodNodes = percent; - } - - void updateValidTargets(ClusterState state) { - List validRandomTargets = new ArrayList<>(); - for (int i=0; i(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR))); - } - public abstract String getTargetSpec(Integer distributor, RoutingContext context); - String getRandomTargetSpec(RoutingContext context) { - Targets targets = validTargets.get(); - // Try to use list of random targets, if at least X % of the nodes are up - while ((targets.total != 0) && - (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes)) - { - int randIndex = randomizer.nextInt(targets.list.size()); - String targetSpec = getTargetSpec(targets.list.get(randIndex), context); - if (targetSpec != null) { - context.trace(3, "Sending to random node seen up in cluster state"); - return targetSpec; - } - targets.list.remove(randIndex); - } - context.trace(3, "Too few nodes seen up in state. Sending totally random."); - return getTargetSpec(null, context); - } - public void close() {} - } - - /** Host fetcher using a slobrok mirror to find the hosts. */ - public static class SlobrokHostFetcher extends HostFetcher { - private final SlobrokHostPatternGenerator patternGenerator; - private final SlobrokPolicy policy; +public class ContentPolicy extends StoragePolicy { - SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { - super(percent); - this.patternGenerator = patternGenerator; - this.policy = policy; - } + public static class ContentParameters extends Parameters { - private List getEntries(String hostPattern, RoutingContext context) { - return policy.lookup(context, hostPattern); + public ContentParameters(Map parameters) { + super(parameters); } - private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; } - - public IMirror getMirror(RoutingContext context) { return context.getMirror(); } - @Override - public String getTargetSpec(Integer distributor, RoutingContext context) { - List arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context); - if (arr.isEmpty()) return null; - if (distributor != null) { - if (arr.size() == 1) { - return convertSlobrokNameToSessionName(arr.get(0).getSpecString()); - } else { - log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor."); - } - } else { - return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString()); - } - return null; - } - } - - static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher { - - /** - * Distributor index to resolved RPC spec cache for a single given Slobrok - * update generation. Uses a thread safe COW map which will grow until stable. - */ - private static class GenerationCache { - private final int generation; - private final CopyOnWriteHashMap targets = new CopyOnWriteHashMap<>(); - - GenerationCache(int generation) { - this.generation = generation; - } - - public int generation() { return this.generation; } - - public String get(Integer index) { - return targets.get(index); - } - public void put(Integer index, String target) { - targets.put(index, target); + public String getDistributionConfigId() { + if (distributionConfigId != null) { + return distributionConfigId; } - } - - private final AtomicReference generationCache = new AtomicReference<>(null); - - TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { - super(patternGenerator, policy, percent); + return clusterName; } @Override - public String getTargetSpec(Integer distributor, RoutingContext context) { - GenerationCache cache = generationCache.get(); - int currentGeneration = getMirror(context).updates(); - // The below code might race with other threads during a generation change. That is OK, as the cache - // is thread safe and will quickly converge to a stable state for the new generation. - if (cache == null || currentGeneration != cache.generation()) { - cache = new GenerationCache(currentGeneration); - generationCache.set(cache); - } - if (distributor != null) { - return cachingGetTargetSpec(distributor, context, cache); - } - // Wildcard lookup case. Must not be cached. - return super.getTargetSpec(null, context); - } - - private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) { - String cachedTarget = cache.get(distributor); - if (cachedTarget != null) { - return cachedTarget; - } - // Mirror _may_ be at a higher version if we race with generation read, but that is OK since - // we'll either way get the most up-to-date mapping and the cache will be invalidated on the - // next invocation. - String resolvedTarget = super.getTargetSpec(distributor, context); - cache.put(distributor, resolvedTarget); - return resolvedTarget; - } - - } - - /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ - public static class Parameters { - protected final String clusterName; - protected final String distributionConfigId; - protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator; - - public Parameters(Map params) { - clusterName = params.get("cluster"); - distributionConfigId = params.get("clusterconfigid"); // TODO jonmv: remove - slobrokHostPatternGenerator = createPatternGenerator(); - if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set"); - } - - String getDistributionConfigId() { - return distributionConfigId == null ? clusterName : distributionConfigId; - } - public String getClusterName() { - return clusterName; - } public SlobrokHostPatternGenerator createPatternGenerator() { - return new SlobrokHostPatternGenerator(getClusterName()); - } - public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { - return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent); - } - public Distribution createDistribution(SlobrokPolicy policy) { - return new Distribution(getDistributionConfigId()); - } - - /** - * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the - * failure was related to node being bad. (Hard to detect from failure) - */ - int getAttemptRandomOnFailuresLimit() { return 5; } - - /** - * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the - * old one. This guards us against version resets. - */ - int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } - - /** - * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random. - * (To avoid hitting trashing bad nodes still in slobrok) - */ - int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } - } - - /** Helper class to get the bucket identifier of a message. */ - public static class BucketIdCalculator { - private static final BucketIdFactory factory = new BucketIdFactory(); - - private BucketId getBucketId(Message msg) { - switch (msg.getType()) { - case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId()); - case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId()); - case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId()); - case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId()); - case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId(); - case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId(); - case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0); - case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId(); - default: - log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported."); - return null; - } - } - - BucketId handleBucketIdCalculation(RoutingContext context) { - BucketId id = getBucketId(context.getMessage()); - if (id == null || id.getRawId() == 0) { - Reply reply = new EmptyReply(); - reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message.")); - context.setReply(reply); - } - return id; - } - } - - /** Class handling the logic of picking a distributor */ - public static class DistributorSelectionLogic { - /** Class that tracks a failure of a given type per node. */ - static class InstabilityChecker { - private final List nodeFailures = new CopyOnWriteArrayList<>(); - private final int failureLimit; - - InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } - - boolean tooManyFailures(int nodeIndex) { - if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) { - nodeFailures.set(nodeIndex, 0); - return true; - } else { - return false; - } - } - - void addFailure(Integer calculatedDistributor) { - while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0); - nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1); - } - } - /** Message context class. Contains data we want to inspect about a request at reply time. */ - private static class MessageContext { - final Integer calculatedDistributor; - final ClusterState usedState; - - MessageContext(ClusterState usedState) { - this(usedState, null); - } - MessageContext(ClusterState usedState, Integer calculatedDistributor) { - this.calculatedDistributor = calculatedDistributor; - this.usedState = usedState; - } - - public String toString() { - return "Context(Distributor " + calculatedDistributor + - ", state version " + usedState.getVersion() + ")"; - } - } - - private final HostFetcher hostFetcher; - private final Distribution distribution; - private final InstabilityChecker persistentFailureChecker; - private final AtomicReference safeCachedClusterState = new AtomicReference<>(null); - private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0); - private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection - - DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) { - try { - hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes()); - distribution = params.createDistribution(policy); - persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit()); - maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState(); - } catch (Throwable e) { - destroy(); - throw e; - } - } - - public void destroy() { - if (hostFetcher != null) { - hostFetcher.close(); - } - if (distribution != null) { - distribution.close(); - } - } - - String getTargetSpec(RoutingContext context, BucketId bucketId) { - String sendRandomReason = null; - ClusterState cachedClusterState = safeCachedClusterState.get(); - - if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. - try{ - Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); - // If we have had too many failures towards existing node, reset failure count and send to random - if (persistentFailureChecker.tooManyFailures(target)) { - sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state."; - target = null; - } - // If we have found a target, and the target exists in slobrok, send to it. - if (target != null) { - context.setContext(new MessageContext(cachedClusterState, target)); - String targetSpec = hostFetcher.getTargetSpec(target, context); - if (targetSpec != null) { - if (context.shouldTrace(1)) { - context.trace(1, "Using distributor " + target + " for " + - bucketId + " as our state version is " + cachedClusterState.getVersion()); - } - return targetSpec; - } else { - sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random."; - log.log(Level.FINE, "Target distributor is not in slobrok"); - } - } else { - context.setContext(new MessageContext(cachedClusterState)); - } - } catch (Distribution.TooFewBucketBitsInUseException e) { - Reply reply = new WrongDistributionReply(cachedClusterState.toString(true)); - reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(reply); - return null; - } catch (Distribution.NoDistributorsAvailableException e) { - log.log(Level.FINE, "No distributors available; clearing cluster state"); - safeCachedClusterState.set(null); - sendRandomReason = "No distributors available. Sending to random distributor."; - context.setContext(createRandomDistributorTargetContext()); - } - } else { - context.setContext(createRandomDistributorTargetContext()); - sendRandomReason = "No cluster state cached. Sending to random distributor."; - } - if (context.shouldTrace(1)) { - context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason"); - } - return hostFetcher.getRandomTargetSpec(context); - } - - private static MessageContext createRandomDistributorTargetContext() { - return new MessageContext(null); - } - - private static Optional clusterStateFromReply(final WrongDistributionReply reply) { - try { - return Optional.of(new ClusterState(reply.getSystemState())); - } catch (Exception e) { - reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState()); - return Optional.empty(); - } - } - - void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { - final MessageContext context = (MessageContext) routingContext.getContext(); - final Optional replyState = clusterStateFromReply(reply); - if (!replyState.isPresent()) { - return; - } - final ClusterState newState = replyState.get(); - resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); - markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); - - if (context.calculatedDistributor == null) { - traceReplyFromRandomDistributor(reply, newState); - } else { - traceReplyFromSpecificDistributor(reply, context, newState); - } - updateCachedRoutingStateFromWrongDistribution(context, newState); - } - - private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { - safeCachedClusterState.set(newState); - if (newState.getClusterState().equals(State.UP)) { - hostFetcher.updateValidTargets(newState); - } - } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { - safeCachedClusterState.set(null); - } else if (context.calculatedDistributor != null) { - persistentFailureChecker.addFailure(context.calculatedDistributor); - } - } - - private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) { - if (context.usedState == null) { - String msg = "Used state must be set as distributor is calculated. Bug."; - reply.getTrace().trace(1, msg); - log.log(Level.SEVERE, msg); - } else if (newState.getVersion() == context.usedState.getVersion()) { - String msg = "Message sent to distributor " + context.calculatedDistributor + - " retrieved cluster state version " + newState.getVersion() + - " which was the state we used to calculate distributor as target last time."; - reply.getTrace().trace(1, msg); - // Client load can be rejected towards distributors even with a matching cluster state version. - // This usually happens during a node fail-over transition, where the target distributor will - // reject an operation bound to a particular bucket if it does not own the bucket in _both_ - // the current and the next (transition target) state. Since it can happen during normal operation - // and will happen per client operation, we keep this as debug level to prevent spamming the logs. - log.log(Level.FINE, msg); - } else if (newState.getVersion() > context.usedState.getVersion()) { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " updated cluster state from version " + context.usedState.getVersion() + - " to " + newState.getVersion()); - } - } else { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " returned older cluster state version " + newState.getVersion()); - } - } - } - - private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { - if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) { - oldClusterVersionGottenCount.set(0); - safeCachedClusterState.set(null); - } - } - } - - private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) { - if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) { - if (reply.getRetryDelay() <= 0.0) { - reply.setRetryDelay(-1); - } - } else { - if (reply.getRetryDelay() <= 0.0) { - reply.setRetryDelay(0); - } - } - } - - private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) { - if (!reply.getTrace().shouldTrace(1)) { - return; - } - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState == null) { - reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); - } else if (newState.getVersion() == cachedClusterState.getVersion()) { - reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); - } else if (newState.getVersion() > cachedClusterState.getVersion()) { - reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); - } else { - reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); - } - } - - void handleErrorReply(Reply reply, Object untypedContext) { - MessageContext messageContext = (MessageContext) untypedContext; - if (messageContext.calculatedDistributor != null) { - persistentFailureChecker.addFailure(messageContext.calculatedDistributor); - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Failed with " + messageContext.toString()); + return new SlobrokHostPatternGenerator(getClusterName()) { + public String getDistributorHostPattern(Integer distributor) { + return "storage/cluster." + getClusterName() + "/distributor/" + (distributor == null ? "*" : distributor) + "/default"; } - } + }; } } - private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); - private final DistributorSelectionLogic distributorSelectionLogic; - private final Parameters parameters; - - /** Constructor used in production. */ - public ContentPolicy(String param) { - this(parse(param)); - } - public ContentPolicy(Map params) { - this(new Parameters(params)); + super(new ContentParameters(params)); } - /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */ - public ContentPolicy(Parameters p) { - super(); - parameters = p; - distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); + public ContentPolicy(String parameters) { + this(parse(parameters)); } - @Override - public void select(RoutingContext context) { - if (context.shouldTrace(1)) { - context.trace(1, "Selecting route"); - } - - BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); - if (context.hasReply()) return; - - String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); - if (context.hasReply()) return; - if (targetSpec != null) { - Route route = new Route(context.getRoute()); - route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec))); - context.addChild(route); - } else { - context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "Could not resolve any distributors to send to in cluster " + parameters.clusterName); - } - } - - @Override - public void merge(RoutingContext context) { - RoutingNodeIterator it = context.getChildIterator(); - Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply(); - if (reply == null) { - reply = new EmptyReply(); - reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "No reply in any children, nor in the routing context: " + context)); - } - - if (reply instanceof WrongDistributionReply) { - distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); - } else if (reply.hasErrors()) { - distributorSelectionLogic.handleErrorReply(reply, context.getContext()); - } else if (reply instanceof WriteDocumentReply) { - if (context.shouldTrace(9)) { - context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); - } - } - context.setReply(reply); - } - - @Override - public void destroy() { - distributorSelectionLogic.destroy(); - } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java index f5b4920fa3f..ca32c5722e6 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java @@ -263,7 +263,7 @@ public class DocumentProtocol implements Protocol { putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg)); putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory()); putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory()); - putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.ContentPolicyFactory()); + putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.StoragePolicyFactory()); putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory()); // Prepare version specifications to use when adding routable factories. diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java index 7b44a1a4f0d..6954d8f3a1d 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java @@ -16,6 +16,15 @@ public abstract class RoutingPolicyFactories { } } + static class StoragePolicyFactory implements RoutingPolicyFactory { + public DocumentProtocolRoutingPolicy createPolicy(String param) { + return new StoragePolicy(param); + } + + public void destroy() { + } + } + static class ContentPolicyFactory implements RoutingPolicyFactory { public DocumentProtocolRoutingPolicy createPolicy(String param) { return new ContentPolicy(param); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java new file mode 100644 index 00000000000..b74f7431531 --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -0,0 +1,615 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.document.BucketId; +import com.yahoo.document.BucketIdFactory; +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.jrt.slobrok.api.Mirror; +import java.util.logging.Level; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingContext; +import com.yahoo.messagebus.routing.RoutingNodeIterator; +import com.yahoo.messagebus.routing.VerbatimDirective; +import com.yahoo.vdslib.distribution.Distribution; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * Routing policy to determine which distributor in a storage cluster to send data to. + * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to. + * + * cluster=[clusterName] (Mandatory, determines the cluster name) + * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application) + * clusterconfigid=[id] (Optional, use given config id for distribution instead of default) + * + * @author Haakon Humberset + */ +public class StoragePolicy extends SlobrokPolicy { + + private static final Logger log = Logger.getLogger(StoragePolicy.class.getName()); + public static final String owningBucketStates = "uim"; + private static final String upStates = "ui"; + + /** This class merely generates slobrok a host pattern for a given distributor. */ + public static class SlobrokHostPatternGenerator { + private final String base; + private final String all; + SlobrokHostPatternGenerator(String clusterName) { + base = "storage/cluster." + clusterName + "/distributor/"; + all = base + "*/default"; + + } + + /** + * Find host pattern of the hosts that are valid targets for this request. + * @param distributor Set to -1 if any distributor is valid target. + */ + public String getDistributorHostPattern(Integer distributor) { + return (distributor == null) ? all : (base + distributor + "/default"); + } + } + + /** Helper class to match a host pattern with node to use. */ + public abstract static class HostFetcher { + + private static class Targets { + private final List list; + private final int total; + Targets() { + this(Collections.emptyList(), 1); + } + Targets(List list, int total) { + this.list = list; + this.total = total; + } + } + + private final int requiredUpPercentageToSendToKnownGoodNodes; + private final AtomicReference validTargets = new AtomicReference<>(new Targets()); + protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. + + protected HostFetcher(int percent) { + requiredUpPercentageToSendToKnownGoodNodes = percent; + } + + void updateValidTargets(ClusterState state) { + List validRandomTargets = new ArrayList<>(); + for (int i=0; i(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR))); + } + public abstract String getTargetSpec(Integer distributor, RoutingContext context); + String getRandomTargetSpec(RoutingContext context) { + Targets targets = validTargets.get(); + // Try to use list of random targets, if at least X % of the nodes are up + while ((targets.total != 0) && + (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes)) + { + int randIndex = randomizer.nextInt(targets.list.size()); + String targetSpec = getTargetSpec(targets.list.get(randIndex), context); + if (targetSpec != null) { + context.trace(3, "Sending to random node seen up in cluster state"); + return targetSpec; + } + targets.list.remove(randIndex); + } + context.trace(3, "Too few nodes seen up in state. Sending totally random."); + return getTargetSpec(null, context); + } + public void close() {} + } + + /** Host fetcher using a slobrok mirror to find the hosts. */ + public static class SlobrokHostFetcher extends HostFetcher { + private final SlobrokHostPatternGenerator patternGenerator; + private final SlobrokPolicy policy; + + SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { + super(percent); + this.patternGenerator = patternGenerator; + this.policy = policy; + } + + private List getEntries(String hostPattern, RoutingContext context) { + return policy.lookup(context, hostPattern); + } + + private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; } + + public IMirror getMirror(RoutingContext context) { return context.getMirror(); } + + @Override + public String getTargetSpec(Integer distributor, RoutingContext context) { + List arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context); + if (arr.isEmpty()) return null; + if (distributor != null) { + if (arr.size() == 1) { + return convertSlobrokNameToSessionName(arr.get(0).getSpecString()); + } else { + log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor."); + } + } else { + return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString()); + } + return null; + } + } + + static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher { + + /** + * Distributor index to resolved RPC spec cache for a single given Slobrok + * update generation. Uses a thread safe COW map which will grow until stable. + */ + private static class GenerationCache { + private final int generation; + private final CopyOnWriteHashMap targets = new CopyOnWriteHashMap<>(); + + GenerationCache(int generation) { + this.generation = generation; + } + + public int generation() { return this.generation; } + + public String get(Integer index) { + return targets.get(index); + } + public void put(Integer index, String target) { + targets.put(index, target); + } + } + + private final AtomicReference generationCache = new AtomicReference<>(null); + + TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { + super(patternGenerator, policy, percent); + } + + @Override + public String getTargetSpec(Integer distributor, RoutingContext context) { + GenerationCache cache = generationCache.get(); + int currentGeneration = getMirror(context).updates(); + // The below code might race with other threads during a generation change. That is OK, as the cache + // is thread safe and will quickly converge to a stable state for the new generation. + if (cache == null || currentGeneration != cache.generation()) { + cache = new GenerationCache(currentGeneration); + generationCache.set(cache); + } + if (distributor != null) { + return cachingGetTargetSpec(distributor, context, cache); + } + // Wildcard lookup case. Must not be cached. + return super.getTargetSpec(null, context); + } + + private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) { + String cachedTarget = cache.get(distributor); + if (cachedTarget != null) { + return cachedTarget; + } + // Mirror _may_ be at a higher version if we race with generation read, but that is OK since + // we'll either way get the most up-to-date mapping and the cache will be invalidated on the + // next invocation. + String resolvedTarget = super.getTargetSpec(distributor, context); + cache.put(distributor, resolvedTarget); + return resolvedTarget; + } + + } + + /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ + public static class Parameters { + protected final String clusterName; + protected final String distributionConfigId; + protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator; + + public Parameters(Map params) { + clusterName = params.get("cluster"); + distributionConfigId = params.get("clusterconfigid"); + slobrokHostPatternGenerator = createPatternGenerator(); + if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set"); + } + + String getDistributionConfigId() { + return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId); + } + public String getClusterName() { + return clusterName; + } + public SlobrokHostPatternGenerator createPatternGenerator() { + return new SlobrokHostPatternGenerator(getClusterName()); + } + public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { + return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent); + } + public Distribution createDistribution(SlobrokPolicy policy) { + return new Distribution(getDistributionConfigId()); + } + + /** + * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the + * failure was related to node being bad. (Hard to detect from failure) + */ + int getAttemptRandomOnFailuresLimit() { return 5; } + + /** + * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the + * old one. This guards us against version resets. + */ + int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } + + /** + * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random. + * (To avoid hitting trashing bad nodes still in slobrok) + */ + int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } + } + + /** Helper class to get the bucket identifier of a message. */ + public static class BucketIdCalculator { + private static final BucketIdFactory factory = new BucketIdFactory(); + + private BucketId getBucketId(Message msg) { + switch (msg.getType()) { + case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId()); + case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId()); + case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId()); + case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId()); + case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId(); + case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId(); + case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0); + case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId(); + default: + log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported."); + return null; + } + } + + BucketId handleBucketIdCalculation(RoutingContext context) { + BucketId id = getBucketId(context.getMessage()); + if (id == null || id.getRawId() == 0) { + Reply reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message.")); + context.setReply(reply); + } + return id; + } + } + + /** Class handling the logic of picking a distributor */ + public static class DistributorSelectionLogic { + /** Class that tracks a failure of a given type per node. */ + static class InstabilityChecker { + private final List nodeFailures = new CopyOnWriteArrayList<>(); + private final int failureLimit; + + InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } + + boolean tooManyFailures(int nodeIndex) { + if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) { + nodeFailures.set(nodeIndex, 0); + return true; + } else { + return false; + } + } + + void addFailure(Integer calculatedDistributor) { + while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0); + nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1); + } + } + /** Message context class. Contains data we want to inspect about a request at reply time. */ + private static class MessageContext { + final Integer calculatedDistributor; + final ClusterState usedState; + + MessageContext() { + this(null, null); + } + MessageContext(ClusterState usedState) { + this(usedState, null); + } + MessageContext(ClusterState usedState, Integer calculatedDistributor) { + this.calculatedDistributor = calculatedDistributor; + this.usedState = usedState; + } + + public String toString() { + return "Context(Distributor " + calculatedDistributor + + ", state version " + usedState.getVersion() + ")"; + } + } + + private final HostFetcher hostFetcher; + private final Distribution distribution; + private final InstabilityChecker persistentFailureChecker; + private final AtomicReference safeCachedClusterState = new AtomicReference<>(null); + private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0); + private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection + + DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) { + try { + hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes()); + distribution = params.createDistribution(policy); + persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit()); + maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState(); + } catch (Throwable e) { + destroy(); + throw e; + } + } + + public void destroy() { + if (hostFetcher != null) { + hostFetcher.close(); + } + if (distribution != null) { + distribution.close(); + } + } + + String getTargetSpec(RoutingContext context, BucketId bucketId) { + String sendRandomReason = null; + ClusterState cachedClusterState = safeCachedClusterState.get(); + + if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. + try{ + Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); + // If we have had too many failures towards existing node, reset failure count and send to random + if (persistentFailureChecker.tooManyFailures(target)) { + sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state."; + target = null; + } + // If we have found a target, and the target exists in slobrok, send to it. + if (target != null) { + context.setContext(new MessageContext(cachedClusterState, target)); + String targetSpec = hostFetcher.getTargetSpec(target, context); + if (targetSpec != null) { + if (context.shouldTrace(1)) { + context.trace(1, "Using distributor " + target + " for " + + bucketId + " as our state version is " + cachedClusterState.getVersion()); + } + return targetSpec; + } else { + sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random."; + log.log(Level.FINE, "Target distributor is not in slobrok"); + } + } else { + context.setContext(new MessageContext(cachedClusterState)); + } + } catch (Distribution.TooFewBucketBitsInUseException e) { + Reply reply = new WrongDistributionReply(cachedClusterState.toString(true)); + reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(reply); + return null; + } catch (Distribution.NoDistributorsAvailableException e) { + log.log(Level.FINE, "No distributors available; clearing cluster state"); + safeCachedClusterState.set(null); + sendRandomReason = "No distributors available. Sending to random distributor."; + context.setContext(createRandomDistributorTargetContext()); + } + } else { + context.setContext(createRandomDistributorTargetContext()); + sendRandomReason = "No cluster state cached. Sending to random distributor."; + } + if (context.shouldTrace(1)) { + context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason"); + } + return hostFetcher.getRandomTargetSpec(context); + } + + private static MessageContext createRandomDistributorTargetContext() { + return new MessageContext(null); + } + + private static Optional clusterStateFromReply(final WrongDistributionReply reply) { + try { + return Optional.of(new ClusterState(reply.getSystemState())); + } catch (Exception e) { + reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState()); + return Optional.empty(); + } + } + + void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { + final MessageContext context = (MessageContext) routingContext.getContext(); + final Optional replyState = clusterStateFromReply(reply); + if (!replyState.isPresent()) { + return; + } + final ClusterState newState = replyState.get(); + resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); + markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); + + if (context.calculatedDistributor == null) { + traceReplyFromRandomDistributor(reply, newState); + } else { + traceReplyFromSpecificDistributor(reply, context, newState); + } + updateCachedRoutingStateFromWrongDistribution(context, newState); + } + + private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { + safeCachedClusterState.set(newState); + if (newState.getClusterState().equals(State.UP)) { + hostFetcher.updateValidTargets(newState); + } + } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { + safeCachedClusterState.set(null); + } else if (context.calculatedDistributor != null) { + persistentFailureChecker.addFailure(context.calculatedDistributor); + } + } + + private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) { + if (context.usedState == null) { + String msg = "Used state must be set as distributor is calculated. Bug."; + reply.getTrace().trace(1, msg); + log.log(Level.SEVERE, msg); + } else if (newState.getVersion() == context.usedState.getVersion()) { + String msg = "Message sent to distributor " + context.calculatedDistributor + + " retrieved cluster state version " + newState.getVersion() + + " which was the state we used to calculate distributor as target last time."; + reply.getTrace().trace(1, msg); + // Client load can be rejected towards distributors even with a matching cluster state version. + // This usually happens during a node fail-over transition, where the target distributor will + // reject an operation bound to a particular bucket if it does not own the bucket in _both_ + // the current and the next (transition target) state. Since it can happen during normal operation + // and will happen per client operation, we keep this as debug level to prevent spamming the logs. + log.log(Level.FINE, msg); + } else if (newState.getVersion() > context.usedState.getVersion()) { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " updated cluster state from version " + context.usedState.getVersion() + + " to " + newState.getVersion()); + } + } else { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " returned older cluster state version " + newState.getVersion()); + } + } + } + + private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { + if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) { + oldClusterVersionGottenCount.set(0); + safeCachedClusterState.set(null); + } + } + } + + private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) { + if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) { + if (reply.getRetryDelay() <= 0.0) { + reply.setRetryDelay(-1); + } + } else { + if (reply.getRetryDelay() <= 0.0) { + reply.setRetryDelay(0); + } + } + } + + private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) { + if (!reply.getTrace().shouldTrace(1)) { + return; + } + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState == null) { + reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); + } else if (newState.getVersion() == cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); + } else if (newState.getVersion() > cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); + } else { + reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); + } + } + + void handleErrorReply(Reply reply, Object untypedContext) { + MessageContext messageContext = (MessageContext) untypedContext; + if (messageContext.calculatedDistributor != null) { + persistentFailureChecker.addFailure(messageContext.calculatedDistributor); + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Failed with " + messageContext.toString()); + } + } + } + } + + private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); + private final DistributorSelectionLogic distributorSelectionLogic; + private final Parameters parameters; + + /** Constructor used in production. */ + public StoragePolicy(String param) { + this(parse(param)); + } + + public StoragePolicy(Map params) { + this(new Parameters(params)); + } + + /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */ + public StoragePolicy(Parameters p) { + super(); + parameters = p; + distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); + } + + @Override + public void select(RoutingContext context) { + if (context.shouldTrace(1)) { + context.trace(1, "Selecting route"); + } + + BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); + if (context.hasReply()) return; + + String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); + if (context.hasReply()) return; + if (targetSpec != null) { + Route route = new Route(context.getRoute()); + route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec))); + context.addChild(route); + } else { + context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "Could not resolve any distributors to send to in cluster " + parameters.clusterName); + } + } + + @Override + public void merge(RoutingContext context) { + RoutingNodeIterator it = context.getChildIterator(); + Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply(); + if (reply == null) { + reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "No reply in any children, nor in the routing context: " + context)); + } + + if (reply instanceof WrongDistributionReply) { + distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); + } else if (reply.hasErrors()) { + distributorSelectionLogic.handleErrorReply(reply, context.getContext()); + } else if (reply instanceof WriteDocumentReply) { + if (context.shouldTrace(9)) { + context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); + } + } + context.setReply(reply); + } + + @Override + public void destroy() { + distributorSelectionLogic.destroy(); + } +} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java index 52afcdfd77c..cdc5878321a 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java @@ -49,8 +49,8 @@ public class TargetCachingSlobrokHostFetcherTest { static class Fixture { SlobrokPolicy mockSlobrokPolicy = mock(SlobrokPolicy.class); IMirror mockMirror = mock(IMirror.class); - ContentPolicy.SlobrokHostPatternGenerator patternGenerator = new ContentPolicy.SlobrokHostPatternGenerator("foo"); - ContentPolicy.TargetCachingSlobrokHostFetcher hostFetcher = new ContentPolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); + StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo"); + StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); RoutingContext routingContext = mock(RoutingContext.class); Fixture() { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java index 018697c0719..8a6df061430 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java @@ -15,7 +15,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -public class BasicTests extends ContentPolicyTestEnvironment { +public class BasicTests extends StoragePolicyTestEnvironment { /** Test that we can send a message through the policy. */ @Test diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java deleted file mode 100644 index f324245b612..00000000000 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol.test.storagepolicy; - -import org.junit.Ignore; -import org.junit.Test; - -public class ContentPolicyTest extends Simulator { - /** - * Verify that a resent message with failures doesn't ruin overall performance. (By dumping the cached state too often - * so other requests are sent to wrong target) - * Lets one node always fail message with transient error. - */ - @Test - @Ignore // FIXME test has been implicitly disabled for ages, figure out and fix - public void testPersistentFailureTransientError() { - runSimulation("First correctnode 99, wrongnode 1, downnode 0, worked 90, failed 10 " - + "Last correctnode 99, wrongnode 1, downnode 0, worked 92, failed 8", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.TRANSIENT_ERROR))); - } - /** - * Verify that a resent message with failures doesn't ruin overall performance. (By dumping the cached state too often - * so other requests are sent to wrong target) - * Lets one node always fail message with fatal error. - */ - @Test - @Ignore // FIXME test has been implicitly disabled for ages, figure out and fix - public void testPersistentFailureFatalError() { - runSimulation("First correctnode 99, wrongnode 1, downnode 0, worked 90, failed 10 " - + "Last correctnode 99, wrongnode 1, downnode 0, worked 92, failed 8", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.FATAL_ERROR))); - } - /** - * Verify that a node responding with old cluster state doesn't ruin overall performance (By dumping/switching cached - * state too often) - * Let one node reporting an old cluster state (but node is still set up in fleetcontroller state). - * We expect some requests to go to wrong node due to this issue, but the majority of requests should be unaffected. - */ - @Test - public void testPersistentFailureOldClusterState() { - runSimulation("First correctnode .*, wrongnode .*, downnode .*, worked .*, failed .* " - + "Last correctnode 100, wrongnode 0, downnode 0, worked 100, failed 0", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.OLD_CLUSTER_STATE).setDownInCurrentState())); - } - /** - * Verify that a reset cluster state version doesn't keep sending requests to the wrong node. - * We expect a few failures in first half. We should have detected the issue before second half, so there all should be fine. - */ - @Test - public void testPersistentFailureResetClusterState() { - // If reset detection works (within the few messages sent in test), we should not fail any requests or send to wrong nodes in second half - runSimulation("First correctnode .*, wrongnode .*, downnode .*, worked .*, failed .* " - + "Last correctnode .*, wrongnode 0, downnode 0, worked .*, failed 0", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.RESET_CLUSTER_STATE).setDownInCurrentState())); - } - /** - * Verify that a reset cluster state version doesn't keep sending requests to the wrong node. - * We expect a few failures in first half. We should have detected the issue before second half, so there all should be fine. - */ - @Test - public void testPersistentFailureResetClusterStateNoGoodNodes() { - // If reset detection works (within the few messages sent in test), we should not fail any requests in second half. - - // Current problem here, is that even though we from time to time will send requests to other nodes, and will eventually throw the faulty cluster state, - // we will have pending operations towards this distributor when it happens, so it very quickly returns into a bad state. - - // This issue should hopefully not be that critical as we don't expect nodes to stay up and report erronious states. Even nodes that are down do get the - // cluster states sent to them, and if that doesn't work, how do the client manage to talk to them? - - runSimulation("First correctnode .*, wrongnode .*, downnode .*, worked .*, failed .* " - + "Last correctnode .*, wrongnode 100, downnode 100, worked 0, failed 100", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.RESET_CLUSTER_STATE_NO_GOOD_NODES).setDownInCurrentState())); - } - /** - * Verify that a reset cluster state version doesn't keep sending requests to the wrong node. - * We expect a few failures in first half. We should have detected the issue before second half, so there all should be fine. - */ - @Test - @Ignore // FIXME test has been implicitly disabled for ages, figure out and fix - public void testPersistentFailureResetClusterStateNoGoodNodesNotMarkedDown() { - // If reset detection works (within the few messages sent in test), we should not fail any requests in second half. - - // This is just as sad as the above. Even if the node got detected to be screwed, we'd still be in the setting above. We don't expect nodes - // to get into this state however. - - runSimulation("First correctnode .*, wrongnode .*, downnode .*, worked .*, failed .* " - + "Last correctnode .*, wrongnode 91, downnode 0, worked 0, failed 100", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.RESET_CLUSTER_STATE_NO_GOOD_NODES))); - } - /** - * Verify that a reset cluster state version doesn't keep sending requests to the wrong node. - * Another scenario where we have a node coming up in correct state. - * We expect a few failures in first half. We should have detected the issue before second half, so there all should be fine. - */ - @Test - public void testPersistentFailureResetClusterStateNewNodeUp() { - // If we handled this well, we should have no failing requests, and no requests to down node in second half - runSimulation("First correctnode .*, wrongnode .*, downnode .*, worked .*, failed .* " - + "Last correctnode .*, wrongnode 0, downnode 0, worked .*, failed 0", - new PersistentFailureTestParameters().newNodeAdded().addBadNode(new BadNode(3, FailureType.RESET_CLUSTER_STATE).setDownInCurrentState())); - } - /** Test node that is not in slobrok. Until fleetcontroller detects this, we expect 10% of the requests to go to wrong node. */ - @Test - @Ignore // FIXME test has been implicitly disabled for ages, figure out and fix - public void testPersistentFailureNodeNotInSlobrok() { - runSimulation("First correctnode .*, wrongnode 11, downnode 0, worked .*, failed .* " - + "Last correctnode .*, wrongnode 9, downnode 0, worked 100, failed 0", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.NODE_NOT_IN_SLOBROK))); - } - - /** With two failures, one marked down, hopefully the one not marked down doesn't lead us to use the one marked down. */ - @Test - @Ignore // FIXME test has been implicitly disabled for ages, figure out and fix - public void testPersistentFailureTwoNodesFailingOneMarkedDown() { - // We see that we don't send to down nodes in second half. We still fail requests towards the one not marked down, - // and occasionally send to random due to this - runSimulation("First correctnode .*, wrongnode 23, downnode .*, worked .*, failed .* " - + "Last correctnode .*, wrongnode 4, downnode 0, worked 219, failed 31", - new PersistentFailureTestParameters().addBadNode(new BadNode(3, FailureType.TRANSIENT_ERROR)) - .addBadNode(new BadNode(4, FailureType.TRANSIENT_ERROR).setDownInCurrentState()) - .setTotalRequests(500)); - // Note that we use extra requests here as with only 200 requests there was a pretty good chance of not going to any down node on random anyhow. - } - - // Left to test? - - // Cluster state down - Not overwrite last good nodes to send random to? - // Overwrite cached state or not? -} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java deleted file mode 100644 index 479e0b0f422..00000000000 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol.test.storagepolicy; - -import com.yahoo.collections.Pair; -import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.document.DocumentTypeManagerConfigurer; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolRoutingPolicy; -import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy; -import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; -import com.yahoo.documentapi.messagebus.protocol.RoutingPolicyFactory; -import com.yahoo.documentapi.messagebus.protocol.ContentPolicy; -import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply; -import com.yahoo.documentapi.messagebus.protocol.test.PolicyTestFrame; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.routing.HopSpec; -import com.yahoo.messagebus.routing.RoutingContext; -import com.yahoo.messagebus.routing.RoutingNode; -import com.yahoo.text.Utf8Array; -import com.yahoo.vdslib.distribution.Distribution; -import com.yahoo.vdslib.distribution.RandomGen; -import org.junit.After; -import org.junit.Before; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public abstract class ContentPolicyTestEnvironment { - - protected StoragePolicyTestFactory policyFactory; - protected PolicyTestFrame frame; - private Set nodes; - protected static int[] bucketOneNodePreference = new int[]{ 3, 5, 7, 6, 8, 0, 9, 2, 1, 4 }; - protected boolean debug = true; - - @Before - public void setUp() throws Exception { - DocumentTypeManager manager = new DocumentTypeManager(); - DocumentTypeManagerConfigurer.configure(manager, "file:./test/cfg/testdoc.cfg"); - frame = new PolicyTestFrame(manager); - nodes = new TreeSet<>(); - DocumentProtocol protocol = (DocumentProtocol) frame.getMessageBus().getProtocol((Utf8Array)DocumentProtocol.NAME); - policyFactory = new StoragePolicyTestFactory(nodes); - protocol.putRoutingPolicyFactory("storage", policyFactory); - frame.setMessage(createMessage("id:ns:testdoc:n=1:foo")); - frame.setHop(new HopSpec("test", "[storage:cluster=foo]")); - } - - @After - public void tearDown() { - frame.destroy(); - } - - protected static Message createMessage(String id) { - Message msg = new RemoveDocumentMessage(new DocumentId(id)); - msg.getTrace().setLevel(9); - return msg; - } - - protected void setClusterNodes(int[] ints) { - Set clusterNodes = new TreeSet<>(); - for (int i=0; i extractClusterAndIndexFromPattern(String pattern) { - String[] bits = pattern.split("/"); - if (bits.length < 4) throw new IllegalStateException("Invalid pattern '" + pattern + "'. Expected more parts in it."); - String distributor = bits[3]; - String cluster = bits[1]; - if (cluster.indexOf('.') < 0) throw new IllegalStateException("Expected . in cluster spec '" + cluster + "'."); - cluster = cluster.substring(cluster.indexOf('.') + 1); - return new Pair<>(cluster, distributor); - } - - protected static Pair getAddress(RoutingNode node) { - Pair pair = extractClusterAndIndexFromPattern(node.getRoute().getHop(0).toString()); - return new Pair<>(pair.getFirst(), Integer.valueOf(pair.getSecond())); - } - - protected RoutingNode select() { - List result = frame.select(1); - assertEquals(1, result.size()); - return result.get(0); - } - - protected void addNode(int index) { - nodes.add(index); - } - protected void removeNode(int second) { - assertTrue(nodes.remove(second)); - } - - public static class TestHostFetcher extends ContentPolicy.HostFetcher { - private final String clusterName; - private RandomGen randomizer = new RandomGen(1234); - private final Set nodes; - private Integer avoidPickingAtRandom = null; - - public TestHostFetcher(String clusterName, Set nodes) { - super(60); - this.clusterName = clusterName; - this.nodes = nodes; - } - - public void setAvoidPickingAtRandom(Integer index) { avoidPickingAtRandom = index; } - - @Override - public String getTargetSpec(Integer distributor, RoutingContext context) { - try{ - if (distributor == null) { - if (nodes.size() == 1) { - assertTrue(avoidPickingAtRandom != nodes.iterator().next()); - distributor = nodes.iterator().next(); - } else { - Iterator it = nodes.iterator(); - for (int i = 0, n = randomizer.nextInt(nodes.size() - 1); i nodes) { - super(SlobrokPolicy.parse(parameters)); - hostFetcher = new TestHostFetcher(getClusterName(), nodes); - distribution = new Distribution(Distribution.getDefaultDistributionConfig(2, 10)); - } - - @Override - public ContentPolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; } - - @Override - public Distribution createDistribution(SlobrokPolicy policy) { return distribution; } - } - - public static class StoragePolicyTestFactory implements RoutingPolicyFactory { - private Set nodes; - private final LinkedList parameterInstances = new LinkedList(); - private Integer avoidPickingAtRandom = null; - - public StoragePolicyTestFactory(Set nodes) { - this.nodes = nodes; - } - public DocumentProtocolRoutingPolicy createPolicy(String parameters) { - parameterInstances.addLast(new TestParameters(parameters, nodes)); - ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); - return new ContentPolicy(parameterInstances.getLast()); - } - public void avoidPickingAtRandom(Integer distributor) { - avoidPickingAtRandom = distributor; - for (TestParameters params : parameterInstances) { - ((TestHostFetcher) params.createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); - } - } - public TestParameters getLastParameters() { return parameterInstances.getLast(); } - public void destroy() { - } - } - - private int findPreferredAvailableNodeForTestBucket() { - for (int i=0; i<10; ++i) { - if (nodes.contains(bucketOneNodePreference[i])) return bucketOneNodePreference[i]; - } - throw new IllegalStateException("Found no node available"); - } - - protected void sendToCorrectNode(String cluster, int correctNode) { - RoutingNode target = select(); - target.handleReply(new EmptyReply()); - Reply reply = frame.getReceptor().getReply(60); - assertNotNull(reply); - assertFalse(reply.hasErrors()); - assertEquals(reply.getTrace().toString(), "storage/cluster." + cluster + "/distributor/" + correctNode, target.getRoute().getHop(0).toString()); - } - - protected void replyWrongDistribution(RoutingNode target, String cluster, Integer randomNode, String clusterState) { - // We want test to send to wrong node when sending to random. If distribution changes so the first random - // node picked is the same node we should alter test - if (randomNode != null) { - assertFalse(randomNode == findPreferredAvailableNodeForTestBucket()); - } - target.handleReply(new WrongDistributionReply(clusterState)); - Reply reply = frame.getReceptor().getReply(60); - assertNotNull(reply); - assertFalse(reply.hasErrors()); - - // Verify that we sent to expected node - if (randomNode != null) { - assertEquals(reply.getTrace().toString(), "storage/cluster." + cluster + "/distributor/" + randomNode, target.getRoute().getHop(0).toString()); - } - if (debug) System.err.println("WRONG DISTRIBUTION: " + reply.getTrace()); - } - - protected void replyOk(RoutingNode target) { - target.handleReply(new EmptyReply()); - Reply reply = frame.getReceptor().getReply(60); - assertNotNull(reply); - assertFalse(reply.hasErrors()); - if (debug) System.err.println("OK: " + reply.getTrace()); - } - - protected void replyError(RoutingNode target, com.yahoo.messagebus.Error error) { - EmptyReply reply = new EmptyReply(); - reply.addError(error); - target.handleReply(reply); - assertTrue(reply == frame.getReceptor().getReply(60)); - assertTrue(reply.hasErrors()); - if (debug) System.err.println("ERROR: " + reply.getTrace()); - } - -} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java index d23dd9ea998..68405b002c8 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java @@ -5,7 +5,7 @@ import com.yahoo.document.BucketId; import com.yahoo.document.BucketIdFactory; import com.yahoo.document.DocumentId; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.ContentPolicy; +import com.yahoo.documentapi.messagebus.protocol.StoragePolicy; import com.yahoo.messagebus.routing.RoutingNode; import com.yahoo.vdslib.distribution.RandomGen; import com.yahoo.vdslib.state.ClusterState; @@ -21,7 +21,7 @@ import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public abstract class Simulator extends ContentPolicyTestEnvironment { +public abstract class Simulator extends StoragePolicyTestEnvironment { enum FailureType { TRANSIENT_ERROR, @@ -175,7 +175,7 @@ public abstract class Simulator extends ContentPolicyTestEnvironment { for (int i=0; i nodes; + protected static int[] bucketOneNodePreference = new int[]{ 3, 5, 7, 6, 8, 0, 9, 2, 1, 4 }; + protected boolean debug = true; + + @Before + public void setUp() throws Exception { + DocumentTypeManager manager = new DocumentTypeManager(); + DocumentTypeManagerConfigurer.configure(manager, "file:./test/cfg/testdoc.cfg"); + frame = new PolicyTestFrame(manager); + nodes = new TreeSet<>(); + DocumentProtocol protocol = (DocumentProtocol) frame.getMessageBus().getProtocol((Utf8Array)DocumentProtocol.NAME); + policyFactory = new StoragePolicyTestFactory(nodes); + protocol.putRoutingPolicyFactory("storage", policyFactory); + frame.setMessage(createMessage("id:ns:testdoc:n=1:foo")); + frame.setHop(new HopSpec("test", "[storage:cluster=foo]")); + } + + @After + public void tearDown() { + frame.destroy(); + } + + protected static Message createMessage(String id) { + Message msg = new RemoveDocumentMessage(new DocumentId(id)); + msg.getTrace().setLevel(9); + return msg; + } + + protected void setClusterNodes(int[] ints) { + Set clusterNodes = new TreeSet<>(); + for (int i=0; i extractClusterAndIndexFromPattern(String pattern) { + String[] bits = pattern.split("/"); + if (bits.length < 4) throw new IllegalStateException("Invalid pattern '" + pattern + "'. Expected more parts in it."); + String distributor = bits[3]; + String cluster = bits[1]; + if (cluster.indexOf('.') < 0) throw new IllegalStateException("Expected . in cluster spec '" + cluster + "'."); + cluster = cluster.substring(cluster.indexOf('.') + 1); + return new Pair<>(cluster, distributor); + } + + protected static Pair getAddress(RoutingNode node) { + Pair pair = extractClusterAndIndexFromPattern(node.getRoute().getHop(0).toString()); + return new Pair<>(pair.getFirst(), Integer.valueOf(pair.getSecond())); + } + + protected RoutingNode select() { + List result = frame.select(1); + assertEquals(1, result.size()); + return result.get(0); + } + + protected void addNode(int index) { + nodes.add(index); + } + protected void removeNode(int second) { + assertTrue(nodes.remove(second)); + } + + public static class TestHostFetcher extends StoragePolicy.HostFetcher { + private final String clusterName; + private RandomGen randomizer = new RandomGen(1234); + private final Set nodes; + private Integer avoidPickingAtRandom = null; + + public TestHostFetcher(String clusterName, Set nodes) { + super(60); + this.clusterName = clusterName; + this.nodes = nodes; + } + + public void setAvoidPickingAtRandom(Integer index) { avoidPickingAtRandom = index; } + + @Override + public String getTargetSpec(Integer distributor, RoutingContext context) { + try{ + if (distributor == null) { + if (nodes.size() == 1) { + assertTrue(avoidPickingAtRandom != nodes.iterator().next()); + distributor = nodes.iterator().next(); + } else { + Iterator it = nodes.iterator(); + for (int i = 0, n = randomizer.nextInt(nodes.size() - 1); i nodes) { + super(SlobrokPolicy.parse(parameters)); + hostFetcher = new TestHostFetcher(getClusterName(), nodes); + distribution = new Distribution(Distribution.getDefaultDistributionConfig(2, 10)); + } + + @Override + public StoragePolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; } + + @Override + public Distribution createDistribution(SlobrokPolicy policy) { return distribution; } + } + + public static class StoragePolicyTestFactory implements RoutingPolicyFactory { + private Set nodes; + private final LinkedList parameterInstances = new LinkedList(); + private Integer avoidPickingAtRandom = null; + + public StoragePolicyTestFactory(Set nodes) { + this.nodes = nodes; + } + public DocumentProtocolRoutingPolicy createPolicy(String parameters) { + parameterInstances.addLast(new TestParameters(parameters, nodes)); + ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); + return new StoragePolicy(parameterInstances.getLast()); + } + public void avoidPickingAtRandom(Integer distributor) { + avoidPickingAtRandom = distributor; + for (TestParameters params : parameterInstances) { + ((TestHostFetcher) params.createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); + } + } + public TestParameters getLastParameters() { return parameterInstances.getLast(); } + public void destroy() { + } + } + + private int findPreferredAvailableNodeForTestBucket() { + for (int i=0; i<10; ++i) { + if (nodes.contains(bucketOneNodePreference[i])) return bucketOneNodePreference[i]; + } + throw new IllegalStateException("Found no node available"); + } + + protected void sendToCorrectNode(String cluster, int correctNode) { + RoutingNode target = select(); + target.handleReply(new EmptyReply()); + Reply reply = frame.getReceptor().getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + assertEquals(reply.getTrace().toString(), "storage/cluster." + cluster + "/distributor/" + correctNode, target.getRoute().getHop(0).toString()); + } + + protected void replyWrongDistribution(RoutingNode target, String cluster, Integer randomNode, String clusterState) { + // We want test to send to wrong node when sending to random. If distribution changes so the first random + // node picked is the same node we should alter test + if (randomNode != null) { + assertFalse(randomNode == findPreferredAvailableNodeForTestBucket()); + } + target.handleReply(new WrongDistributionReply(clusterState)); + Reply reply = frame.getReceptor().getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + + // Verify that we sent to expected node + if (randomNode != null) { + assertEquals(reply.getTrace().toString(), "storage/cluster." + cluster + "/distributor/" + randomNode, target.getRoute().getHop(0).toString()); + } + if (debug) System.err.println("WRONG DISTRIBUTION: " + reply.getTrace()); + } + + protected void replyOk(RoutingNode target) { + target.handleReply(new EmptyReply()); + Reply reply = frame.getReceptor().getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + if (debug) System.err.println("OK: " + reply.getTrace()); + } + + protected void replyError(RoutingNode target, com.yahoo.messagebus.Error error) { + EmptyReply reply = new EmptyReply(); + reply.addError(error); + target.handleReply(reply); + assertTrue(reply == frame.getReceptor().getReply(60)); + assertTrue(reply.hasErrors()); + if (debug) System.err.println("ERROR: " + reply.getTrace()); + } + +} diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 02bd6b297d0..0c659f589d6 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -4,13 +4,13 @@ #include #include -#include #include #include #include #include #include #include +#include #include #include #include @@ -51,7 +51,7 @@ private: private: bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector &expected); void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1); - ContentPolicy &setupContentPolicy(TestFrame &frame, const string ¶m, + StoragePolicy &setupStoragePolicy(TestFrame &frame, const string ¶m, const string &pattern = "", int32_t numEntries = -1); bool isErrorPolicy(const string &name, const string ¶m); void assertMirrorReady(const IMirrorAPI &mirror); @@ -83,10 +83,10 @@ public: void requireThatExternPolicyWithUnknownPatternSelectsNone(); void requireThatExternPolicySelectsFromExternSlobrok(); void requireThatExternPolicyMergesOneReplyAsProtocol(); - void requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); - void requireThatContentPolicyIsRandomWithoutState(); - void requireThatContentPolicyIsTargetedWithState(); - void requireThatContentPolicyCombinesSystemAndSlobrokState(); + void requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); + void requireThatStoragePolicyIsRandomWithoutState(); + void requireThatStoragePolicyIsTargetedWithState(); + void requireThatStoragePolicyCombinesSystemAndSlobrokState(); }; TEST_APPHOOK(Test); @@ -128,10 +128,10 @@ Test::Main() { requireThatExternPolicySelectsFromExternSlobrok(); TEST_FLUSH(); requireThatExternPolicyMergesOneReplyAsProtocol(); TEST_FLUSH(); - requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); - requireThatContentPolicyIsRandomWithoutState(); TEST_FLUSH(); - requireThatContentPolicyIsTargetedWithState(); TEST_FLUSH(); - requireThatContentPolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); + requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); + requireThatStoragePolicyIsRandomWithoutState(); TEST_FLUSH(); + requireThatStoragePolicyIsTargetedWithState(); TEST_FLUSH(); + requireThatStoragePolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); TEST_DONE(); } @@ -782,15 +782,15 @@ void Test::testLoadBalancer() { } void -Test::requireThatContentPolicyWithIllegalParamIsAnErrorPolicy() +Test::requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy() { - EXPECT_TRUE(isErrorPolicy("Content", "")); - EXPECT_TRUE(isErrorPolicy("Content", "config=foo;slobroks=foo")); - EXPECT_TRUE(isErrorPolicy("Content", "slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Storage", "")); + EXPECT_TRUE(isErrorPolicy("Storage", "config=foo;slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Storage", "slobroks=foo")); } void -Test::requireThatContentPolicyIsRandomWithoutState() +Test::requireThatStoragePolicyIsRandomWithoutState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -808,7 +808,7 @@ Test::requireThatContentPolicyIsRandomWithoutState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - ContentPolicy &policy = setupContentPolicy( + StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -826,15 +826,15 @@ Test::requireThatContentPolicyIsRandomWithoutState() } } -ContentPolicy & -Test::setupContentPolicy(TestFrame &frame, const string ¶m, +StoragePolicy & +Test::setupStoragePolicy(TestFrame &frame, const string ¶m, const string &pattern, int32_t numEntries) { - frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Content:%s]", param.c_str()))); + frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Storage:%s]", param.c_str()))); mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); const mbus::PolicyDirective dir = static_cast(*hop->getDirective(0)); - ContentPolicy &policy = static_cast(*mbus.getRoutingPolicy(DocumentProtocol::NAME, + StoragePolicy &policy = static_cast(*mbus.getRoutingPolicy(DocumentProtocol::NAME, dir.getName(), dir.getParam())); policy.initSynchronous(); assertMirrorReady(*policy.getMirror()); @@ -845,7 +845,7 @@ Test::setupContentPolicy(TestFrame &frame, const string ¶m, } void -Test::requireThatContentPolicyIsTargetedWithState() +Test::requireThatStoragePolicyIsTargetedWithState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -863,7 +863,7 @@ Test::requireThatContentPolicyIsTargetedWithState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - ContentPolicy &policy = setupContentPolicy( + StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -888,7 +888,7 @@ Test::requireThatContentPolicyIsTargetedWithState() } void -Test::requireThatContentPolicyCombinesSystemAndSlobrokState() +Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -902,7 +902,7 @@ Test::requireThatContentPolicyCombinesSystemAndSlobrokState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - ContentPolicy &policy = setupContentPolicy( + StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 1); ASSERT_TRUE(policy.getSystemState() == nullptr); diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index 0f86eb38aca..560f2f28f0e 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -32,14 +32,14 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr repo, // When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now. putRoutingPolicyFactory("AND", std::make_shared()); putRoutingPolicyFactory("Content", std::make_shared()); - putRoutingPolicyFactory("Storage", std::make_shared()); // TODO Vespa 8: remove + putRoutingPolicyFactory("MessageType", std::make_shared()); putRoutingPolicyFactory("DocumentRouteSelector", std::make_shared(*_repo, cfg)); putRoutingPolicyFactory("Extern", std::make_shared()); - putRoutingPolicyFactory("LoadBalancer", std::make_shared()); putRoutingPolicyFactory("LocalService", std::make_shared()); - putRoutingPolicyFactory("MessageType", std::make_shared()); putRoutingPolicyFactory("RoundRobin", std::make_shared()); + putRoutingPolicyFactory("Storage", std::make_shared()); putRoutingPolicyFactory("SubsetService", std::make_shared()); + putRoutingPolicyFactory("LoadBalancer", std::make_shared()); // Prepare version specifications to use when adding routable factories. vespalib::VersionSpecification version6(6, 221); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index 83e1df02a24..26d51e702e9 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_library(documentapi_documentapipolicies OBJECT SOURCES andpolicy.cpp externslobrokpolicy.cpp + storagepolicy.cpp contentpolicy.cpp messagetypepolicy.cpp documentrouteselectorpolicy.cpp diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index 7150794653f..aea393a60af 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -1,79 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "contentpolicy.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -LOG_SETUP(".contentpolicy"); - -using vespalib::make_string; namespace documentapi { ContentPolicy::ContentPolicy(const string& param) - : ExternSlobrokPolicy(parse(param)), - _bucketIdFactory() -{ - std::map params(parse(param)); - - if (params.find("cluster") != params.end()) { - _clusterName = params.find("cluster")->second; - } else { - _error = "Required parameter clustername not set"; - } - - if (params.find("clusterconfigid") != params.end()) { - _clusterConfigId = params.find("clusterconfigid")->second; - } -} - -namespace { - class CallBack : public config::IFetcherCallback - { - public: - CallBack(ContentPolicy & policy) : _policy(policy) { } - void configure(std::unique_ptr config) override { - _policy.configure(std::move(config)); - } - private: - ContentPolicy & _policy; - }; -} -string ContentPolicy::init() -{ - string error = ExternSlobrokPolicy::init(); - if (!error.empty()) { - return error; - } - - if (_clusterConfigId.empty()) { - _clusterConfigId = createConfigId(_clusterName); - } - - using storage::lib::Distribution; - config::ConfigUri uri(_clusterConfigId); - if (!_configSources.empty()) { - _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); - } else { - _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); - } - _callBack = std::make_unique(*this); - _configFetcher->subscribe(uri.getConfigId(), static_cast(_callBack.get())); - _configFetcher->start(); - return ""; -} - -ContentPolicy::~ContentPolicy() = default; + : StoragePolicy(param) +{ } string ContentPolicy::createConfigId(const string & clusterName) const @@ -81,177 +14,4 @@ ContentPolicy::createConfigId(const string & clusterName) const return clusterName; } -string -ContentPolicy::createPattern(const string & clusterName, int distributor) const -{ - vespalib::asciistream ost; - - ost << "storage/cluster." << clusterName << "/distributor/"; - - if (distributor == -1) { - ost << '*'; - } else { - ost << distributor; - } - ost << "/default"; - return ost.str(); -} - -void -ContentPolicy::configure(std::unique_ptr config) -{ - try { - _nextDistribution = std::make_unique(*config); - } catch (const std::exception& e) { - LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); - throw e; - } -} - -void -ContentPolicy::doSelect(mbus::RoutingContext &context) -{ - const mbus::Message &msg = context.getMessage(); - - int distributor = -1; - - if (_state.get()) { - document::BucketId id; - switch(msg.getType()) { - case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast(msg).getDocument().getId()); - break; - - case DocumentProtocol::MESSAGE_GETDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast(msg).getDocumentUpdate().getId()); - break; - - case DocumentProtocol::MESSAGE_STATBUCKET: - id = static_cast(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_GETBUCKETLIST: - id = static_cast(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_CREATEVISITOR: - id = static_cast(msg).getBuckets()[0]; - break; - - case DocumentProtocol::MESSAGE_REMOVELOCATION: - id = static_cast(msg).getBucketId(); - break; - - default: - LOG(error, "Message type '%d' not supported.", msg.getType()); - return; - } - - // _P_A_R_A_N_O_I_A_ - if (id.getRawId() == 0) { - mbus::Reply::UP reply(new mbus::EmptyReply()); - reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, - "No bucket id available in message.")); - context.setReply(std::move(reply)); - return; - } - - // Pick a distributor using ideal state algorithm - try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution) { - _distribution = std::move(_nextDistribution); - _nextDistribution.reset(); - } - assert(_distribution.get()); - distributor = _distribution->getIdealDistributorNode(*_state, id); - } catch (storage::lib::TooFewBucketBitsInUseException& e) { - auto reply = std::make_unique(_state->toString()); - reply->addError(mbus::Error( - DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(std::move(reply)); - return; - } catch (storage::lib::NoDistributorsAvailableException& e) { - // No distributors available in current cluster state. Remove - // cluster state we cannot use and send to random target - _state.reset(); - distributor = -1; - } - } - - mbus::Hop hop = getRecipient(context, distributor); - - if (distributor != -1 && !hop.hasDirectives()) { - hop = getRecipient(context, -1); - } - - if (hop.hasDirectives()) { - mbus::Route route = context.getRoute(); - route.setHop(0, hop); - context.addChild(route); - } else { - context.setError( - mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, - make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); - } -} - -mbus::Hop -ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor) -{ - slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); - - if (!entries.empty()) { - return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); - } - - return mbus::Hop(); -} - -void -ContentPolicy::merge(mbus::RoutingContext &context) -{ - mbus::RoutingNodeIterator it = context.getChildIterator(); - mbus::Reply::UP reply = it.removeReply(); - - if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { - updateStateFromReply(static_cast(*reply)); - } else if (reply->hasErrors()) { - _state.reset(); - } - - context.setReply(std::move(reply)); -} - -void -ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) -{ - std::unique_ptr newState( - new storage::lib::ClusterState(wdr.getSystemState())); - if (!_state || newState->getVersion() >= _state->getVersion()) { - if (_state) { - wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", - _state->getVersion(), newState->getVersion())); - } else { - wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); - } - - _state = std::move(newState); - } else { - wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " - "while old state had version %d. New states should not have a lower version than the old.", - newState->getVersion(), _state->getVersion())); - _state.reset(); - } -} - } // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h index e29fbb75524..4b2f356c740 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h @@ -1,62 +1,17 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "externslobrokpolicy.h" -#include -#include -#include -#include -#include -#include - -namespace config { - class ICallback; - class ConfigFetcher; -} - -namespace storage { -namespace lib { - class Distribution; - class ClusterState; -} -} +#include "storagepolicy.h" namespace documentapi { -class ContentPolicy : public ExternSlobrokPolicy +class ContentPolicy : public StoragePolicy { -private: - document::BucketIdFactory _bucketIdFactory; - std::unique_ptr _state; - string _clusterName; - string _clusterConfigId; - std::unique_ptr _callBack; - std::unique_ptr _configFetcher; - std::unique_ptr _distribution; - std::unique_ptr _nextDistribution; - - mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); - public: ContentPolicy(const string& param); - ~ContentPolicy(); - void doSelect(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; - - void updateStateFromReply(WrongDistributionReply& reply); - - /** - * @return a pointer to the system state registered with this policy. If - * we haven't received a system state yet, returns NULL. - */ - const storage::lib::ClusterState* getSystemState() const { return _state.get(); } - - virtual void configure(std::unique_ptr config); - string init() override; - private: - string createConfigId(const string & clusterName) const; - string createPattern(const string & clusterName, int distributor) const; + string createConfigId(const string & clusterName) const override; }; } + diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp new file mode 100644 index 00000000000..3fc1df0352a --- /dev/null +++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp @@ -0,0 +1,257 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storagepolicy.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +LOG_SETUP(".storagepolicy"); + +using vespalib::make_string; + +namespace documentapi { + +StoragePolicy::StoragePolicy(const string& param) + : ExternSlobrokPolicy(parse(param)), + _bucketIdFactory() +{ + std::map params(parse(param)); + + if (params.find("cluster") != params.end()) { + _clusterName = params.find("cluster")->second; + } else { + _error = "Required parameter clustername not set"; + } + + if (params.find("clusterconfigid") != params.end()) { + _clusterConfigId = params.find("clusterconfigid")->second; + } +} + +namespace { + class CallBack : public config::IFetcherCallback + { + public: + CallBack(StoragePolicy & policy) : _policy(policy) { } + void configure(std::unique_ptr config) override { + _policy.configure(std::move(config)); + } + private: + StoragePolicy & _policy; + }; +} +string StoragePolicy::init() +{ + string error = ExternSlobrokPolicy::init(); + if (!error.empty()) { + return error; + } + + if (_clusterConfigId.empty()) { + _clusterConfigId = createConfigId(_clusterName); + } + + using storage::lib::Distribution; + config::ConfigUri uri(_clusterConfigId); + if (!_configSources.empty()) { + _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); + } else { + _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); + } + _callBack = std::make_unique(*this); + _configFetcher->subscribe(uri.getConfigId(), static_cast(_callBack.get())); + _configFetcher->start(); + return ""; +} + +StoragePolicy::~StoragePolicy() = default; + +string +StoragePolicy::createConfigId(const string & clusterName) const +{ + return "storage/cluster." + clusterName; +} + +string +StoragePolicy::createPattern(const string & clusterName, int distributor) const +{ + vespalib::asciistream ost; + + ost << "storage/cluster." << clusterName << "/distributor/"; + + if (distributor == -1) { + ost << '*'; + } else { + ost << distributor; + } + ost << "/default"; + return ost.str(); +} + +void +StoragePolicy::configure(std::unique_ptr config) +{ + try { + _nextDistribution = std::make_unique(*config); + } catch (const std::exception& e) { + LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); + throw e; + } +} + +void +StoragePolicy::doSelect(mbus::RoutingContext &context) +{ + const mbus::Message &msg = context.getMessage(); + + int distributor = -1; + + if (_state.get()) { + document::BucketId id; + switch(msg.getType()) { + case DocumentProtocol::MESSAGE_PUTDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast(msg).getDocument().getId()); + break; + + case DocumentProtocol::MESSAGE_GETDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast(msg).getDocumentUpdate().getId()); + break; + + case DocumentProtocol::MESSAGE_STATBUCKET: + id = static_cast(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_GETBUCKETLIST: + id = static_cast(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_CREATEVISITOR: + id = static_cast(msg).getBuckets()[0]; + break; + + case DocumentProtocol::MESSAGE_REMOVELOCATION: + id = static_cast(msg).getBucketId(); + break; + + default: + LOG(error, "Message type '%d' not supported.", msg.getType()); + return; + } + + // _P_A_R_A_N_O_I_A_ + if (id.getRawId() == 0) { + mbus::Reply::UP reply(new mbus::EmptyReply()); + reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, + "No bucket id available in message.")); + context.setReply(std::move(reply)); + return; + } + + // Pick a distributor using ideal state algorithm + try { + // Update distribution here, to make it not take lock in average case + if (_nextDistribution) { + _distribution = std::move(_nextDistribution); + _nextDistribution.reset(); + } + assert(_distribution.get()); + distributor = _distribution->getIdealDistributorNode(*_state, id); + } catch (storage::lib::TooFewBucketBitsInUseException& e) { + auto reply = std::make_unique(_state->toString()); + reply->addError(mbus::Error( + DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(std::move(reply)); + return; + } catch (storage::lib::NoDistributorsAvailableException& e) { + // No distributors available in current cluster state. Remove + // cluster state we cannot use and send to random target + _state.reset(); + distributor = -1; + } + } + + mbus::Hop hop = getRecipient(context, distributor); + + if (distributor != -1 && !hop.hasDirectives()) { + hop = getRecipient(context, -1); + } + + if (hop.hasDirectives()) { + mbus::Route route = context.getRoute(); + route.setHop(0, hop); + context.addChild(route); + } else { + context.setError( + mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, + make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); + } +} + +mbus::Hop +StoragePolicy::getRecipient(mbus::RoutingContext& context, int distributor) +{ + slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); + + if (!entries.empty()) { + return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); + } + + return mbus::Hop(); +} + +void +StoragePolicy::merge(mbus::RoutingContext &context) +{ + mbus::RoutingNodeIterator it = context.getChildIterator(); + mbus::Reply::UP reply = it.removeReply(); + + if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { + updateStateFromReply(static_cast(*reply)); + } else if (reply->hasErrors()) { + _state.reset(); + } + + context.setReply(std::move(reply)); +} + +void +StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr) +{ + std::unique_ptr newState( + new storage::lib::ClusterState(wdr.getSystemState())); + if (!_state || newState->getVersion() >= _state->getVersion()) { + if (_state) { + wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", + _state->getVersion(), newState->getVersion())); + } else { + wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); + } + + _state = std::move(newState); + } else { + wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " + "while old state had version %d. New states should not have a lower version than the old.", + newState->getVersion(), _state->getVersion())); + _state.reset(); + } +} + +} // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h new file mode 100644 index 00000000000..5cd2efcbbd3 --- /dev/null +++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h @@ -0,0 +1,63 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "externslobrokpolicy.h" +#include +#include +#include +#include +#include +#include + +namespace config { + class ICallback; + class ConfigFetcher; +} + +namespace storage { +namespace lib { + class Distribution; + class ClusterState; +} +} + +namespace documentapi { + +class StoragePolicy : public ExternSlobrokPolicy +{ +private: + document::BucketIdFactory _bucketIdFactory; + std::unique_ptr _state; + string _clusterName; + string _clusterConfigId; + std::unique_ptr _callBack; + std::unique_ptr _configFetcher; + std::unique_ptr _distribution; + std::unique_ptr _nextDistribution; + + mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); + +public: + StoragePolicy(const string& param); + ~StoragePolicy(); + void doSelect(mbus::RoutingContext &context) override; + void merge(mbus::RoutingContext &context) override; + + void updateStateFromReply(WrongDistributionReply& reply); + + /** + * @return a pointer to the system state registered with this policy. If + * we haven't received a system state yet, returns NULL. + */ + const storage::lib::ClusterState* getSystemState() const { return _state.get(); } + + virtual void configure(std::unique_ptr config); + string init() override; + +private: + virtual string createConfigId(const string & clusterName) const; + string createPattern(const string & clusterName, int distributor) const; +}; + +} + diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp index f945fe8cd02..2c244c63046 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp @@ -1,15 +1,16 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "routingpolicyfactories.h" #include -#include #include #include #include -#include #include -#include #include #include +#include +#include +#include +#include using namespace documentapi; @@ -19,6 +20,17 @@ RoutingPolicyFactories::AndPolicyFactory::createPolicy(const string ¶m) cons return mbus::IRoutingPolicy::UP(new ANDPolicy(param)); } +mbus::IRoutingPolicy::UP +RoutingPolicyFactories::StoragePolicyFactory::createPolicy(const string ¶m) const +{ + mbus::IRoutingPolicy::UP ret(new StoragePolicy(param)); + string error = static_cast(*ret).getError(); + if (!error.empty()) { + ret.reset(new ErrorPolicy(error)); + } + return ret; +} + mbus::IRoutingPolicy::UP RoutingPolicyFactories::MessageTypePolicyFactory::createPolicy(const string ¶m) const { diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h index 533ad93e644..e2bf5119c58 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h @@ -16,6 +16,10 @@ public: public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; }; + class StoragePolicyFactory : public IRoutingPolicyFactory { + public: + mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; + }; class MessageTypePolicyFactory : public IRoutingPolicyFactory { public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; -- cgit v1.2.3