summaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java615
1 files changed, 615 insertions, 0 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
new file mode 100644
index 00000000000..b74f7431531
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -0,0 +1,615 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.concurrent.CopyOnWriteHashMap;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.BucketIdFactory;
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import java.util.logging.Level;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingContext;
+import com.yahoo.messagebus.routing.RoutingNodeIterator;
+import com.yahoo.messagebus.routing.VerbatimDirective;
+import com.yahoo.vdslib.distribution.Distribution;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.Node;
+import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vdslib.state.State;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/**
+ * Routing policy to determine which distributor in a storage cluster to send data to.
+ * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to.
+ *
+ * cluster=[clusterName] (Mandatory, determines the cluster name)
+ * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application)
+ * clusterconfigid=[id] (Optional, use given config id for distribution instead of default)
+ *
+ * @author Haakon Humberset
+ */
+public class StoragePolicy extends SlobrokPolicy {
+
+ private static final Logger log = Logger.getLogger(StoragePolicy.class.getName());
+ public static final String owningBucketStates = "uim";
+ private static final String upStates = "ui";
+
+ /** This class merely generates slobrok a host pattern for a given distributor. */
+ public static class SlobrokHostPatternGenerator {
+ private final String base;
+ private final String all;
+ SlobrokHostPatternGenerator(String clusterName) {
+ base = "storage/cluster." + clusterName + "/distributor/";
+ all = base + "*/default";
+
+ }
+
+ /**
+ * Find host pattern of the hosts that are valid targets for this request.
+ * @param distributor Set to -1 if any distributor is valid target.
+ */
+ public String getDistributorHostPattern(Integer distributor) {
+ return (distributor == null) ? all : (base + distributor + "/default");
+ }
+ }
+
+ /** Helper class to match a host pattern with node to use. */
+ public abstract static class HostFetcher {
+
+ private static class Targets {
+ private final List<Integer> list;
+ private final int total;
+ Targets() {
+ this(Collections.emptyList(), 1);
+ }
+ Targets(List<Integer> list, int total) {
+ this.list = list;
+ this.total = total;
+ }
+ }
+
+ private final int requiredUpPercentageToSendToKnownGoodNodes;
+ private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets());
+ protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy.
+
+ protected HostFetcher(int percent) {
+ requiredUpPercentageToSendToKnownGoodNodes = percent;
+ }
+
+ void updateValidTargets(ClusterState state) {
+ List<Integer> validRandomTargets = new ArrayList<>();
+ for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) {
+ if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i);
+ }
+ validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR)));
+ }
+ public abstract String getTargetSpec(Integer distributor, RoutingContext context);
+ String getRandomTargetSpec(RoutingContext context) {
+ Targets targets = validTargets.get();
+ // Try to use list of random targets, if at least X % of the nodes are up
+ while ((targets.total != 0) &&
+ (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes))
+ {
+ int randIndex = randomizer.nextInt(targets.list.size());
+ String targetSpec = getTargetSpec(targets.list.get(randIndex), context);
+ if (targetSpec != null) {
+ context.trace(3, "Sending to random node seen up in cluster state");
+ return targetSpec;
+ }
+ targets.list.remove(randIndex);
+ }
+ context.trace(3, "Too few nodes seen up in state. Sending totally random.");
+ return getTargetSpec(null, context);
+ }
+ public void close() {}
+ }
+
+ /** Host fetcher using a slobrok mirror to find the hosts. */
+ public static class SlobrokHostFetcher extends HostFetcher {
+ private final SlobrokHostPatternGenerator patternGenerator;
+ private final SlobrokPolicy policy;
+
+ SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
+ super(percent);
+ this.patternGenerator = patternGenerator;
+ this.policy = policy;
+ }
+
+ private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) {
+ return policy.lookup(context, hostPattern);
+ }
+
+ private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; }
+
+ public IMirror getMirror(RoutingContext context) { return context.getMirror(); }
+
+ @Override
+ public String getTargetSpec(Integer distributor, RoutingContext context) {
+ List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context);
+ if (arr.isEmpty()) return null;
+ if (distributor != null) {
+ if (arr.size() == 1) {
+ return convertSlobrokNameToSessionName(arr.get(0).getSpecString());
+ } else {
+ log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor.");
+ }
+ } else {
+ return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString());
+ }
+ return null;
+ }
+ }
+
+ static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher {
+
+ /**
+ * Distributor index to resolved RPC spec cache for a single given Slobrok
+ * update generation. Uses a thread safe COW map which will grow until stable.
+ */
+ private static class GenerationCache {
+ private final int generation;
+ private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>();
+
+ GenerationCache(int generation) {
+ this.generation = generation;
+ }
+
+ public int generation() { return this.generation; }
+
+ public String get(Integer index) {
+ return targets.get(index);
+ }
+ public void put(Integer index, String target) {
+ targets.put(index, target);
+ }
+ }
+
+ private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);
+
+ TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
+ super(patternGenerator, policy, percent);
+ }
+
+ @Override
+ public String getTargetSpec(Integer distributor, RoutingContext context) {
+ GenerationCache cache = generationCache.get();
+ int currentGeneration = getMirror(context).updates();
+ // The below code might race with other threads during a generation change. That is OK, as the cache
+ // is thread safe and will quickly converge to a stable state for the new generation.
+ if (cache == null || currentGeneration != cache.generation()) {
+ cache = new GenerationCache(currentGeneration);
+ generationCache.set(cache);
+ }
+ if (distributor != null) {
+ return cachingGetTargetSpec(distributor, context, cache);
+ }
+ // Wildcard lookup case. Must not be cached.
+ return super.getTargetSpec(null, context);
+ }
+
+ private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
+ String cachedTarget = cache.get(distributor);
+ if (cachedTarget != null) {
+ return cachedTarget;
+ }
+ // Mirror _may_ be at a higher version if we race with generation read, but that is OK since
+ // we'll either way get the most up-to-date mapping and the cache will be invalidated on the
+ // next invocation.
+ String resolvedTarget = super.getTargetSpec(distributor, context);
+ cache.put(distributor, resolvedTarget);
+ return resolvedTarget;
+ }
+
+ }
+
+ /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
+ public static class Parameters {
+ protected final String clusterName;
+ protected final String distributionConfigId;
+ protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;
+
+ public Parameters(Map<String, String> params) {
+ clusterName = params.get("cluster");
+ distributionConfigId = params.get("clusterconfigid");
+ slobrokHostPatternGenerator = createPatternGenerator();
+ if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set");
+ }
+
+ String getDistributionConfigId() {
+ return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId);
+ }
+ public String getClusterName() {
+ return clusterName;
+ }
+ public SlobrokHostPatternGenerator createPatternGenerator() {
+ return new SlobrokHostPatternGenerator(getClusterName());
+ }
+ public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
+ return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
+ }
+ public Distribution createDistribution(SlobrokPolicy policy) {
+ return new Distribution(getDistributionConfigId());
+ }
+
+ /**
+ * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the
+ * failure was related to node being bad. (Hard to detect from failure)
+ */
+ int getAttemptRandomOnFailuresLimit() { return 5; }
+
+ /**
+ * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the
+ * old one. This guards us against version resets.
+ */
+ int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; }
+
+ /**
+ * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random.
+ * (To avoid hitting trashing bad nodes still in slobrok)
+ */
+ int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; }
+ }
+
+ /** Helper class to get the bucket identifier of a message. */
+ public static class BucketIdCalculator {
+ private static final BucketIdFactory factory = new BucketIdFactory();
+
+ private BucketId getBucketId(Message msg) {
+ switch (msg.getType()) {
+ case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId());
+ case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId());
+ case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId());
+ case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId());
+ case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId();
+ case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId();
+ case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0);
+ case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId();
+ default:
+ log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported.");
+ return null;
+ }
+ }
+
+ BucketId handleBucketIdCalculation(RoutingContext context) {
+ BucketId id = getBucketId(context.getMessage());
+ if (id == null || id.getRawId() == 0) {
+ Reply reply = new EmptyReply();
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message."));
+ context.setReply(reply);
+ }
+ return id;
+ }
+ }
+
+ /** Class handling the logic of picking a distributor */
+ public static class DistributorSelectionLogic {
+ /** Class that tracks a failure of a given type per node. */
+ static class InstabilityChecker {
+ private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
+ private final int failureLimit;
+
+ InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; }
+
+ boolean tooManyFailures(int nodeIndex) {
+ if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
+ nodeFailures.set(nodeIndex, 0);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void addFailure(Integer calculatedDistributor) {
+ while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0);
+ nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
+ }
+ }
+ /** Message context class. Contains data we want to inspect about a request at reply time. */
+ private static class MessageContext {
+ final Integer calculatedDistributor;
+ final ClusterState usedState;
+
+ MessageContext() {
+ this(null, null);
+ }
+ MessageContext(ClusterState usedState) {
+ this(usedState, null);
+ }
+ MessageContext(ClusterState usedState, Integer calculatedDistributor) {
+ this.calculatedDistributor = calculatedDistributor;
+ this.usedState = usedState;
+ }
+
+ public String toString() {
+ return "Context(Distributor " + calculatedDistributor +
+ ", state version " + usedState.getVersion() + ")";
+ }
+ }
+
+ private final HostFetcher hostFetcher;
+ private final Distribution distribution;
+ private final InstabilityChecker persistentFailureChecker;
+ private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null);
+ private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
+ private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
+
+ DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
+ try {
+ hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
+ distribution = params.createDistribution(policy);
+ persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
+ maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
+ } catch (Throwable e) {
+ destroy();
+ throw e;
+ }
+ }
+
+ public void destroy() {
+ if (hostFetcher != null) {
+ hostFetcher.close();
+ }
+ if (distribution != null) {
+ distribution.close();
+ }
+ }
+
+ String getTargetSpec(RoutingContext context, BucketId bucketId) {
+ String sendRandomReason = null;
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+
+ if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
+ try{
+ Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
+ // If we have had too many failures towards existing node, reset failure count and send to random
+ if (persistentFailureChecker.tooManyFailures(target)) {
+ sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
+ target = null;
+ }
+ // If we have found a target, and the target exists in slobrok, send to it.
+ if (target != null) {
+ context.setContext(new MessageContext(cachedClusterState, target));
+ String targetSpec = hostFetcher.getTargetSpec(target, context);
+ if (targetSpec != null) {
+ if (context.shouldTrace(1)) {
+ context.trace(1, "Using distributor " + target + " for " +
+ bucketId + " as our state version is " + cachedClusterState.getVersion());
+ }
+ return targetSpec;
+ } else {
+ sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
+ log.log(Level.FINE, "Target distributor is not in slobrok");
+ }
+ } else {
+ context.setContext(new MessageContext(cachedClusterState));
+ }
+ } catch (Distribution.TooFewBucketBitsInUseException e) {
+ Reply reply = new WrongDistributionReply(cachedClusterState.toString(true));
+ reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION,
+ "Too few distribution bits used for given cluster state"));
+ context.setReply(reply);
+ return null;
+ } catch (Distribution.NoDistributorsAvailableException e) {
+ log.log(Level.FINE, "No distributors available; clearing cluster state");
+ safeCachedClusterState.set(null);
+ sendRandomReason = "No distributors available. Sending to random distributor.";
+ context.setContext(createRandomDistributorTargetContext());
+ }
+ } else {
+ context.setContext(createRandomDistributorTargetContext());
+ sendRandomReason = "No cluster state cached. Sending to random distributor.";
+ }
+ if (context.shouldTrace(1)) {
+ context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason");
+ }
+ return hostFetcher.getRandomTargetSpec(context);
+ }
+
+ private static MessageContext createRandomDistributorTargetContext() {
+ return new MessageContext(null);
+ }
+
+ private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) {
+ try {
+ return Optional.of(new ClusterState(reply.getSystemState()));
+ } catch (Exception e) {
+ reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
+ return Optional.empty();
+ }
+ }
+
+ void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
+ final MessageContext context = (MessageContext) routingContext.getContext();
+ final Optional<ClusterState> replyState = clusterStateFromReply(reply);
+ if (!replyState.isPresent()) {
+ return;
+ }
+ final ClusterState newState = replyState.get();
+ resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
+ markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);
+
+ if (context.calculatedDistributor == null) {
+ traceReplyFromRandomDistributor(reply, newState);
+ } else {
+ traceReplyFromSpecificDistributor(reply, context, newState);
+ }
+ updateCachedRoutingStateFromWrongDistribution(context, newState);
+ }
+
+ private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
+ safeCachedClusterState.set(newState);
+ if (newState.getClusterState().equals(State.UP)) {
+ hostFetcher.updateValidTargets(newState);
+ }
+ } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
+ safeCachedClusterState.set(null);
+ } else if (context.calculatedDistributor != null) {
+ persistentFailureChecker.addFailure(context.calculatedDistributor);
+ }
+ }
+
+ private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
+ if (context.usedState == null) {
+ String msg = "Used state must be set as distributor is calculated. Bug.";
+ reply.getTrace().trace(1, msg);
+ log.log(Level.SEVERE, msg);
+ } else if (newState.getVersion() == context.usedState.getVersion()) {
+ String msg = "Message sent to distributor " + context.calculatedDistributor +
+ " retrieved cluster state version " + newState.getVersion() +
+ " which was the state we used to calculate distributor as target last time.";
+ reply.getTrace().trace(1, msg);
+ // Client load can be rejected towards distributors even with a matching cluster state version.
+ // This usually happens during a node fail-over transition, where the target distributor will
+ // reject an operation bound to a particular bucket if it does not own the bucket in _both_
+ // the current and the next (transition target) state. Since it can happen during normal operation
+ // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
+ log.log(Level.FINE, msg);
+ } else if (newState.getVersion() > context.usedState.getVersion()) {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " updated cluster state from version " + context.usedState.getVersion() +
+ " to " + newState.getVersion());
+ }
+ } else {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " returned older cluster state version " + newState.getVersion());
+ }
+ }
+ }
+
+ private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
+ if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) {
+ oldClusterVersionGottenCount.set(0);
+ safeCachedClusterState.set(null);
+ }
+ }
+ }
+
+ private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
+ if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
+ if (reply.getRetryDelay() <= 0.0) {
+ reply.setRetryDelay(-1);
+ }
+ } else {
+ if (reply.getRetryDelay() <= 0.0) {
+ reply.setRetryDelay(0);
+ }
+ }
+ }
+
+ private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
+ if (!reply.getTrace().shouldTrace(1)) {
+ return;
+ }
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState == null) {
+ reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
+ } else if (newState.getVersion() == cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
+ } else if (newState.getVersion() > cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
+ } else {
+ reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
+ }
+ }
+
+ void handleErrorReply(Reply reply, Object untypedContext) {
+ MessageContext messageContext = (MessageContext) untypedContext;
+ if (messageContext.calculatedDistributor != null) {
+ persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Failed with " + messageContext.toString());
+ }
+ }
+ }
+ }
+
+ private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
+ private final DistributorSelectionLogic distributorSelectionLogic;
+ private final Parameters parameters;
+
+ /** Constructor used in production. */
+ public StoragePolicy(String param) {
+ this(parse(param));
+ }
+
+ public StoragePolicy(Map<String, String> params) {
+ this(new Parameters(params));
+ }
+
+ /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
+ public StoragePolicy(Parameters p) {
+ super();
+ parameters = p;
+ distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
+ }
+
+ @Override
+ public void select(RoutingContext context) {
+ if (context.shouldTrace(1)) {
+ context.trace(1, "Selecting route");
+ }
+
+ BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
+ if (context.hasReply()) return;
+
+ String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
+ if (context.hasReply()) return;
+ if (targetSpec != null) {
+ Route route = new Route(context.getRoute());
+ route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec)));
+ context.addChild(route);
+ } else {
+ context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
+ "Could not resolve any distributors to send to in cluster " + parameters.clusterName);
+ }
+ }
+
+ @Override
+ public void merge(RoutingContext context) {
+ RoutingNodeIterator it = context.getChildIterator();
+ Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply();
+ if (reply == null) {
+ reply = new EmptyReply();
+ reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
+ "No reply in any children, nor in the routing context: " + context));
+ }
+
+ if (reply instanceof WrongDistributionReply) {
+ distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
+ } else if (reply.hasErrors()) {
+ distributorSelectionLogic.handleErrorReply(reply, context.getContext());
+ } else if (reply instanceof WriteDocumentReply) {
+ if (context.shouldTrace(9)) {
+ context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
+ }
+ }
+ context.setReply(reply);
+ }
+
+ @Override
+ public void destroy() {
+ distributorSelectionLogic.destroy();
+ }
+}