summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-12-22 15:38:43 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-12-22 15:38:43 +0100
commit7dd7edf46500c62e0e02a1833c69bc8d9d45d71b (patch)
tree84c5d2acf408923bb8b5e0f8068d2da76240dbd5 /documentapi
parentaf6c9b690f485ddaede7b85394e3850fd52aeb64 (diff)
Merge StoragePolicy down into ContentPolicy, and always use this
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/abi-spec.json174
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java602
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java2
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java615
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java4
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java2
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java)2
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java)12
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java6
9 files changed, 669 insertions, 750 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index a70f59bb9fb..39d6898215c 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -1519,30 +1519,92 @@
],
"fields": []
},
- "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$ContentParameters": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters",
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$BucketIdCalculator": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$DistributorSelectionLogic": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void destroy()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "abstract"
+ ],
+ "methods": [
+ "protected void <init>(int)",
+ "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)",
+ "public void close()"
+ ],
+ "fields": [
+ "protected final java.util.Random randomizer"
+ ]
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters": {
+ "superClass": "java.lang.Object",
"interfaces": [],
"attributes": [
"public"
],
"methods": [
"public void <init>(java.util.Map)",
- "public java.lang.String getDistributionConfigId()",
- "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()"
+ "public java.lang.String getClusterName()",
+ "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator createPatternGenerator()",
+ "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)",
+ "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)"
+ ],
+ "fields": [
+ "protected final java.lang.String clusterName",
+ "protected final java.lang.String distributionConfigId",
+ "protected final com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator"
+ ]
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostFetcher": {
+ "superClass": "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)",
+ "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)"
],
"fields": []
},
"com.yahoo.documentapi.messagebus.protocol.ContentPolicy": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy",
+ "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy",
"interfaces": [],
"attributes": [
"public"
],
"methods": [
+ "public void <init>(java.lang.String)",
"public void <init>(java.util.Map)",
- "public void <init>(java.lang.String)"
+ "public void <init>(com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters)",
+ "public void select(com.yahoo.messagebus.routing.RoutingContext)",
+ "public void merge(com.yahoo.messagebus.routing.RoutingContext)",
+ "public void destroy()"
],
- "fields": []
+ "fields": [
+ "public static final java.lang.String owningBucketStates"
+ ]
},
"com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage": {
"superClass": "com.yahoo.documentapi.messagebus.protocol.DocumentMessage",
@@ -2979,104 +3041,6 @@
],
"fields": []
},
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$BucketIdCalculator": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>()"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$DistributorSelectionLogic": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void destroy()"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public",
- "abstract"
- ],
- "methods": [
- "protected void <init>(int)",
- "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)",
- "public void close()"
- ],
- "fields": [
- "protected final java.util.Random randomizer"
- ]
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>(java.util.Map)",
- "public java.lang.String getClusterName()",
- "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()",
- "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)",
- "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)"
- ],
- "fields": [
- "protected final java.lang.String clusterName",
- "protected final java.lang.String distributionConfigId",
- "protected final com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator"
- ]
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostFetcher": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)",
- "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public java.lang.String getDistributorHostPattern(java.lang.Integer)"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>(java.lang.String)",
- "public void <init>(java.util.Map)",
- "public void <init>(com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters)",
- "public void select(com.yahoo.messagebus.routing.RoutingContext)",
- "public void merge(com.yahoo.messagebus.routing.RoutingContext)",
- "public void destroy()"
- ],
- "fields": [
- "public static final java.lang.String owningBucketStates"
- ]
- },
"com.yahoo.documentapi.messagebus.protocol.SubsetServicePolicy": {
"superClass": "java.lang.Object",
"interfaces": [
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
index d6e20b9d57f..bc537af6ade 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
@@ -1,43 +1,613 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
+import com.yahoo.concurrent.CopyOnWriteHashMap;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.BucketIdFactory;
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import java.util.logging.Level;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingContext;
+import com.yahoo.messagebus.routing.RoutingNodeIterator;
+import com.yahoo.messagebus.routing.VerbatimDirective;
+import com.yahoo.vdslib.distribution.Distribution;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.Node;
+import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vdslib.state.State;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
/**
- * Policy to talk to content clusters.
+ * Routing policy to determine which distributor in a content cluster to send data to.
+ * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to.
+ *
+ * cluster=[clusterName] (Mandatory, determines the cluster name)
+ * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application)
+ * clusterconfigid=[id] (Optional, use given config id for distribution instead of default)
+ *
+ * @author Haakon Humberset
*/
-public class ContentPolicy extends StoragePolicy {
+public class ContentPolicy extends SlobrokPolicy {
+
+ private static final Logger log = Logger.getLogger(ContentPolicy.class.getName());
+ public static final String owningBucketStates = "uim";
+ private static final String upStates = "ui";
+
+ /** This class merely generates a slobrok host pattern for a given distributor. */
+ static class SlobrokHostPatternGenerator {
+
+ private final String base;
+
+ SlobrokHostPatternGenerator(String clusterName) {
+ this.base = "storage/cluster." + clusterName + "/distributor/";
+ }
+
+ /**
+ * Find host pattern of the hosts that are valid targets for this request.
+ *
+ * @param distributor Set to null if any distributor is valid target.
+ */
+ String getDistributorHostPattern(Integer distributor) {
+ return base + (distributor == null ? "*" : distributor) + "/default";
+ }
+
+ }
+
+ /** Helper class to match a host pattern with node to use. */
+ public abstract static class HostFetcher {
+
+ private static class Targets {
+ private final List<Integer> list;
+ private final int total;
+ Targets() {
+ this(Collections.emptyList(), 1);
+ }
+ Targets(List<Integer> list, int total) {
+ this.list = list;
+ this.total = total;
+ }
+ }
+
+ private final int requiredUpPercentageToSendToKnownGoodNodes;
+ private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets());
+ protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy.
+
+ protected HostFetcher(int percent) {
+ requiredUpPercentageToSendToKnownGoodNodes = percent;
+ }
+
+ void updateValidTargets(ClusterState state) {
+ List<Integer> validRandomTargets = new ArrayList<>();
+ for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) {
+ if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i);
+ }
+ validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR)));
+ }
+ public abstract String getTargetSpec(Integer distributor, RoutingContext context);
+ String getRandomTargetSpec(RoutingContext context) {
+ Targets targets = validTargets.get();
+ // Try to use list of random targets, if at least X % of the nodes are up
+ while ((targets.total != 0) &&
+ (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes))
+ {
+ int randIndex = randomizer.nextInt(targets.list.size());
+ String targetSpec = getTargetSpec(targets.list.get(randIndex), context);
+ if (targetSpec != null) {
+ context.trace(3, "Sending to random node seen up in cluster state");
+ return targetSpec;
+ }
+ targets.list.remove(randIndex);
+ }
+ context.trace(3, "Too few nodes seen up in state. Sending totally random.");
+ return getTargetSpec(null, context);
+ }
+ public void close() {}
+ }
+
+ /** Host fetcher using a slobrok mirror to find the hosts. */
+ public static class SlobrokHostFetcher extends HostFetcher {
+ private final SlobrokHostPatternGenerator patternGenerator;
+ private final SlobrokPolicy policy;
- public static class ContentParameters extends Parameters {
+ SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
+ super(percent);
+ this.patternGenerator = patternGenerator;
+ this.policy = policy;
+ }
- public ContentParameters(Map<String, String> parameters) {
- super(parameters);
+ private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) {
+ return policy.lookup(context, hostPattern);
}
+ private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; }
+
+ public IMirror getMirror(RoutingContext context) { return context.getMirror(); }
+
@Override
- public String getDistributionConfigId() {
- if (distributionConfigId != null) {
- return distributionConfigId;
+ public String getTargetSpec(Integer distributor, RoutingContext context) {
+ List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context);
+ if (arr.isEmpty()) return null;
+ if (distributor != null) {
+ if (arr.size() == 1) {
+ return convertSlobrokNameToSessionName(arr.get(0).getSpecString());
+ } else {
+ log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor.");
+ }
+ } else {
+ return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString());
}
- return clusterName;
+ return null;
+ }
+ }
+
+ static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher {
+
+ /**
+ * Distributor index to resolved RPC spec cache for a single given Slobrok
+ * update generation. Uses a thread safe COW map which will grow until stable.
+ */
+ private static class GenerationCache {
+ private final int generation;
+ private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>();
+
+ GenerationCache(int generation) {
+ this.generation = generation;
+ }
+
+ public int generation() { return this.generation; }
+
+ public String get(Integer index) {
+ return targets.get(index);
+ }
+ public void put(Integer index, String target) {
+ targets.put(index, target);
+ }
+ }
+
+ private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);
+
+ TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
+ super(patternGenerator, policy, percent);
}
@Override
+ public String getTargetSpec(Integer distributor, RoutingContext context) {
+ GenerationCache cache = generationCache.get();
+ int currentGeneration = getMirror(context).updates();
+ // The below code might race with other threads during a generation change. That is OK, as the cache
+ // is thread safe and will quickly converge to a stable state for the new generation.
+ if (cache == null || currentGeneration != cache.generation()) {
+ cache = new GenerationCache(currentGeneration);
+ generationCache.set(cache);
+ }
+ if (distributor != null) {
+ return cachingGetTargetSpec(distributor, context, cache);
+ }
+ // Wildcard lookup case. Must not be cached.
+ return super.getTargetSpec(null, context);
+ }
+
+ private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
+ String cachedTarget = cache.get(distributor);
+ if (cachedTarget != null) {
+ return cachedTarget;
+ }
+ // Mirror _may_ be at a higher version if we race with generation read, but that is OK since
+ // we'll either way get the most up-to-date mapping and the cache will be invalidated on the
+ // next invocation.
+ String resolvedTarget = super.getTargetSpec(distributor, context);
+ cache.put(distributor, resolvedTarget);
+ return resolvedTarget;
+ }
+
+ }
+
+ /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
+ public static class Parameters {
+ protected final String clusterName;
+ protected final String distributionConfigId;
+ protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;
+
+ public Parameters(Map<String, String> params) {
+ clusterName = params.get("cluster");
+ distributionConfigId = params.get("clusterconfigid");
+ slobrokHostPatternGenerator = createPatternGenerator();
+ if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set");
+ }
+
+ String getDistributionConfigId() {
+ return distributionConfigId == null ? clusterName : distributionConfigId;
+ }
+ public String getClusterName() {
+ return clusterName;
+ }
public SlobrokHostPatternGenerator createPatternGenerator() {
- return new SlobrokHostPatternGenerator(getClusterName()) {
- public String getDistributorHostPattern(Integer distributor) {
- return "storage/cluster." + getClusterName() + "/distributor/" + (distributor == null ? "*" : distributor) + "/default";
+ return new SlobrokHostPatternGenerator(getClusterName());
+ }
+ public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
+ return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
+ }
+ public Distribution createDistribution(SlobrokPolicy policy) {
+ return new Distribution(getDistributionConfigId());
+ }
+
+ /**
+ * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the
+ * failure was related to node being bad. (Hard to detect from failure)
+ */
+ int getAttemptRandomOnFailuresLimit() { return 5; }
+
+ /**
+ * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the
+ * old one. This guards us against version resets.
+ */
+ int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; }
+
+ /**
+ * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random.
+ * (To avoid hitting trashing bad nodes still in slobrok)
+ */
+ int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; }
+ }
+
+ /** Helper class to get the bucket identifier of a message. */
+ public static class BucketIdCalculator {
+ private static final BucketIdFactory factory = new BucketIdFactory();
+
+ private BucketId getBucketId(Message msg) {
+ switch (msg.getType()) {
+ case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId());
+ case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId());
+ case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId());
+ case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId());
+ case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId();
+ case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId();
+ case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0);
+ case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId();
+ default:
+ log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported.");
+ return null;
+ }
+ }
+
+ BucketId handleBucketIdCalculation(RoutingContext context) {
+ BucketId id = getBucketId(context.getMessage());
+ if (id == null || id.getRawId() == 0) {
+ Reply reply = new EmptyReply();
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message."));
+ context.setReply(reply);
+ }
+ return id;
+ }
+ }
+
+ /** Class handling the logic of picking a distributor */
+ public static class DistributorSelectionLogic {
+ /** Class that tracks a failure of a given type per node. */
+ static class InstabilityChecker {
+ private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
+ private final int failureLimit;
+
+ InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; }
+
+ boolean tooManyFailures(int nodeIndex) {
+ if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
+ nodeFailures.set(nodeIndex, 0);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void addFailure(Integer calculatedDistributor) {
+ while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0);
+ nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
+ }
+ }
+ /** Message context class. Contains data we want to inspect about a request at reply time. */
+ private static class MessageContext {
+ final Integer calculatedDistributor;
+ final ClusterState usedState;
+
+ MessageContext(ClusterState usedState) {
+ this(usedState, null);
+ }
+ MessageContext(ClusterState usedState, Integer calculatedDistributor) {
+ this.calculatedDistributor = calculatedDistributor;
+ this.usedState = usedState;
+ }
+
+ public String toString() {
+ return "Context(Distributor " + calculatedDistributor +
+ ", state version " + usedState.getVersion() + ")";
+ }
+ }
+
+ private final HostFetcher hostFetcher;
+ private final Distribution distribution;
+ private final InstabilityChecker persistentFailureChecker;
+ private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null);
+ private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
+ private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
+
+ DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
+ try {
+ hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
+ distribution = params.createDistribution(policy);
+ persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
+ maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
+ } catch (Throwable e) {
+ destroy();
+ throw e;
+ }
+ }
+
+ public void destroy() {
+ if (hostFetcher != null) {
+ hostFetcher.close();
+ }
+ if (distribution != null) {
+ distribution.close();
+ }
+ }
+
+ String getTargetSpec(RoutingContext context, BucketId bucketId) {
+ String sendRandomReason = null;
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+
+ if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
+ try{
+ Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
+ // If we have had too many failures towards existing node, reset failure count and send to random
+ if (persistentFailureChecker.tooManyFailures(target)) {
+ sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
+ target = null;
+ }
+ // If we have found a target, and the target exists in slobrok, send to it.
+ if (target != null) {
+ context.setContext(new MessageContext(cachedClusterState, target));
+ String targetSpec = hostFetcher.getTargetSpec(target, context);
+ if (targetSpec != null) {
+ if (context.shouldTrace(1)) {
+ context.trace(1, "Using distributor " + target + " for " +
+ bucketId + " as our state version is " + cachedClusterState.getVersion());
+ }
+ return targetSpec;
+ } else {
+ sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
+ log.log(Level.FINE, "Target distributor is not in slobrok");
+ }
+ } else {
+ context.setContext(new MessageContext(cachedClusterState));
+ }
+ } catch (Distribution.TooFewBucketBitsInUseException e) {
+ Reply reply = new WrongDistributionReply(cachedClusterState.toString(true));
+ reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION,
+ "Too few distribution bits used for given cluster state"));
+ context.setReply(reply);
+ return null;
+ } catch (Distribution.NoDistributorsAvailableException e) {
+ log.log(Level.FINE, "No distributors available; clearing cluster state");
+ safeCachedClusterState.set(null);
+ sendRandomReason = "No distributors available. Sending to random distributor.";
+ context.setContext(createRandomDistributorTargetContext());
+ }
+ } else {
+ context.setContext(createRandomDistributorTargetContext());
+ sendRandomReason = "No cluster state cached. Sending to random distributor.";
+ }
+ if (context.shouldTrace(1)) {
+ context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason");
+ }
+ return hostFetcher.getRandomTargetSpec(context);
+ }
+
+ private static MessageContext createRandomDistributorTargetContext() {
+ return new MessageContext(null);
+ }
+
+ private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) {
+ try {
+ return Optional.of(new ClusterState(reply.getSystemState()));
+ } catch (Exception e) {
+ reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
+ return Optional.empty();
+ }
+ }
+
+ void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
+ final MessageContext context = (MessageContext) routingContext.getContext();
+ final Optional<ClusterState> replyState = clusterStateFromReply(reply);
+ if (!replyState.isPresent()) {
+ return;
+ }
+ final ClusterState newState = replyState.get();
+ resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
+ markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);
+
+ if (context.calculatedDistributor == null) {
+ traceReplyFromRandomDistributor(reply, newState);
+ } else {
+ traceReplyFromSpecificDistributor(reply, context, newState);
+ }
+ updateCachedRoutingStateFromWrongDistribution(context, newState);
+ }
+
+ private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
+ safeCachedClusterState.set(newState);
+ if (newState.getClusterState().equals(State.UP)) {
+ hostFetcher.updateValidTargets(newState);
+ }
+ } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
+ safeCachedClusterState.set(null);
+ } else if (context.calculatedDistributor != null) {
+ persistentFailureChecker.addFailure(context.calculatedDistributor);
+ }
+ }
+
+ private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
+ if (context.usedState == null) {
+ String msg = "Used state must be set as distributor is calculated. Bug.";
+ reply.getTrace().trace(1, msg);
+ log.log(Level.SEVERE, msg);
+ } else if (newState.getVersion() == context.usedState.getVersion()) {
+ String msg = "Message sent to distributor " + context.calculatedDistributor +
+ " retrieved cluster state version " + newState.getVersion() +
+ " which was the state we used to calculate distributor as target last time.";
+ reply.getTrace().trace(1, msg);
+ // Client load can be rejected towards distributors even with a matching cluster state version.
+ // This usually happens during a node fail-over transition, where the target distributor will
+ // reject an operation bound to a particular bucket if it does not own the bucket in _both_
+ // the current and the next (transition target) state. Since it can happen during normal operation
+ // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
+ log.log(Level.FINE, msg);
+ } else if (newState.getVersion() > context.usedState.getVersion()) {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " updated cluster state from version " + context.usedState.getVersion() +
+ " to " + newState.getVersion());
+ }
+ } else {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " returned older cluster state version " + newState.getVersion());
+ }
+ }
+ }
+
+ private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
+ if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) {
+ oldClusterVersionGottenCount.set(0);
+ safeCachedClusterState.set(null);
+ }
+ }
+ }
+
+ private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
+ if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
+ if (reply.getRetryDelay() <= 0.0) {
+ reply.setRetryDelay(-1);
+ }
+ } else {
+ if (reply.getRetryDelay() <= 0.0) {
+ reply.setRetryDelay(0);
+ }
+ }
+ }
+
+ private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
+ if (!reply.getTrace().shouldTrace(1)) {
+ return;
+ }
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState == null) {
+ reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
+ } else if (newState.getVersion() == cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
+ } else if (newState.getVersion() > cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
+ } else {
+ reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
+ }
+ }
+
+ void handleErrorReply(Reply reply, Object untypedContext) {
+ MessageContext messageContext = (MessageContext) untypedContext;
+ if (messageContext.calculatedDistributor != null) {
+ persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Failed with " + messageContext.toString());
}
- };
+ }
}
}
+ private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
+ private final DistributorSelectionLogic distributorSelectionLogic;
+ private final Parameters parameters;
+
+ /** Constructor used in production. */
+ public ContentPolicy(String param) {
+ this(parse(param));
+ }
+
public ContentPolicy(Map<String, String> params) {
- super(new ContentParameters(params));
+ this(new Parameters(params));
}
- public ContentPolicy(String parameters) {
- this(parse(parameters));
+ /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
+ public ContentPolicy(Parameters p) {
+ super();
+ parameters = p;
+ distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
}
+ @Override
+ public void select(RoutingContext context) {
+ if (context.shouldTrace(1)) {
+ context.trace(1, "Selecting route");
+ }
+
+ BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
+ if (context.hasReply()) return;
+
+ String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
+ if (context.hasReply()) return;
+ if (targetSpec != null) {
+ Route route = new Route(context.getRoute());
+ route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec)));
+ context.addChild(route);
+ } else {
+ context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
+ "Could not resolve any distributors to send to in cluster " + parameters.clusterName);
+ }
+ }
+
+ @Override
+ public void merge(RoutingContext context) {
+ RoutingNodeIterator it = context.getChildIterator();
+ Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply();
+ if (reply == null) {
+ reply = new EmptyReply();
+ reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
+ "No reply in any children, nor in the routing context: " + context));
+ }
+
+ if (reply instanceof WrongDistributionReply) {
+ distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
+ } else if (reply.hasErrors()) {
+ distributorSelectionLogic.handleErrorReply(reply, context.getContext());
+ } else if (reply instanceof WriteDocumentReply) {
+ if (context.shouldTrace(9)) {
+ context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
+ }
+ }
+ context.setReply(reply);
+ }
+
+ @Override
+ public void destroy() {
+ distributorSelectionLogic.destroy();
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
index 6954d8f3a1d..87a2fbe44e7 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
@@ -18,7 +18,7 @@ public abstract class RoutingPolicyFactories {
static class StoragePolicyFactory implements RoutingPolicyFactory {
public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new StoragePolicy(param);
+ return new ContentPolicy(param);
}
public void destroy() {
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
deleted file mode 100644
index b74f7431531..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ /dev/null
@@ -1,615 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.concurrent.CopyOnWriteHashMap;
-import com.yahoo.document.BucketId;
-import com.yahoo.document.BucketIdFactory;
-import com.yahoo.jrt.slobrok.api.IMirror;
-import com.yahoo.jrt.slobrok.api.Mirror;
-import java.util.logging.Level;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.routing.Hop;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.messagebus.routing.RoutingNodeIterator;
-import com.yahoo.messagebus.routing.VerbatimDirective;
-import com.yahoo.vdslib.distribution.Distribution;
-import com.yahoo.vdslib.state.ClusterState;
-import com.yahoo.vdslib.state.Node;
-import com.yahoo.vdslib.state.NodeType;
-import com.yahoo.vdslib.state.State;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-/**
- * Routing policy to determine which distributor in a storage cluster to send data to.
- * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to.
- *
- * cluster=[clusterName] (Mandatory, determines the cluster name)
- * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application)
- * clusterconfigid=[id] (Optional, use given config id for distribution instead of default)
- *
- * @author Haakon Humberset
- */
-public class StoragePolicy extends SlobrokPolicy {
-
- private static final Logger log = Logger.getLogger(StoragePolicy.class.getName());
- public static final String owningBucketStates = "uim";
- private static final String upStates = "ui";
-
- /** This class merely generates slobrok a host pattern for a given distributor. */
- public static class SlobrokHostPatternGenerator {
- private final String base;
- private final String all;
- SlobrokHostPatternGenerator(String clusterName) {
- base = "storage/cluster." + clusterName + "/distributor/";
- all = base + "*/default";
-
- }
-
- /**
- * Find host pattern of the hosts that are valid targets for this request.
- * @param distributor Set to -1 if any distributor is valid target.
- */
- public String getDistributorHostPattern(Integer distributor) {
- return (distributor == null) ? all : (base + distributor + "/default");
- }
- }
-
- /** Helper class to match a host pattern with node to use. */
- public abstract static class HostFetcher {
-
- private static class Targets {
- private final List<Integer> list;
- private final int total;
- Targets() {
- this(Collections.emptyList(), 1);
- }
- Targets(List<Integer> list, int total) {
- this.list = list;
- this.total = total;
- }
- }
-
- private final int requiredUpPercentageToSendToKnownGoodNodes;
- private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets());
- protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy.
-
- protected HostFetcher(int percent) {
- requiredUpPercentageToSendToKnownGoodNodes = percent;
- }
-
- void updateValidTargets(ClusterState state) {
- List<Integer> validRandomTargets = new ArrayList<>();
- for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) {
- if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i);
- }
- validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR)));
- }
- public abstract String getTargetSpec(Integer distributor, RoutingContext context);
- String getRandomTargetSpec(RoutingContext context) {
- Targets targets = validTargets.get();
- // Try to use list of random targets, if at least X % of the nodes are up
- while ((targets.total != 0) &&
- (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes))
- {
- int randIndex = randomizer.nextInt(targets.list.size());
- String targetSpec = getTargetSpec(targets.list.get(randIndex), context);
- if (targetSpec != null) {
- context.trace(3, "Sending to random node seen up in cluster state");
- return targetSpec;
- }
- targets.list.remove(randIndex);
- }
- context.trace(3, "Too few nodes seen up in state. Sending totally random.");
- return getTargetSpec(null, context);
- }
- public void close() {}
- }
-
- /** Host fetcher using a slobrok mirror to find the hosts. */
- public static class SlobrokHostFetcher extends HostFetcher {
- private final SlobrokHostPatternGenerator patternGenerator;
- private final SlobrokPolicy policy;
-
- SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
- super(percent);
- this.patternGenerator = patternGenerator;
- this.policy = policy;
- }
-
- private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) {
- return policy.lookup(context, hostPattern);
- }
-
- private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; }
-
- public IMirror getMirror(RoutingContext context) { return context.getMirror(); }
-
- @Override
- public String getTargetSpec(Integer distributor, RoutingContext context) {
- List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context);
- if (arr.isEmpty()) return null;
- if (distributor != null) {
- if (arr.size() == 1) {
- return convertSlobrokNameToSessionName(arr.get(0).getSpecString());
- } else {
- log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor.");
- }
- } else {
- return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString());
- }
- return null;
- }
- }
-
- static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher {
-
- /**
- * Distributor index to resolved RPC spec cache for a single given Slobrok
- * update generation. Uses a thread safe COW map which will grow until stable.
- */
- private static class GenerationCache {
- private final int generation;
- private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>();
-
- GenerationCache(int generation) {
- this.generation = generation;
- }
-
- public int generation() { return this.generation; }
-
- public String get(Integer index) {
- return targets.get(index);
- }
- public void put(Integer index, String target) {
- targets.put(index, target);
- }
- }
-
- private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);
-
- TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
- super(patternGenerator, policy, percent);
- }
-
- @Override
- public String getTargetSpec(Integer distributor, RoutingContext context) {
- GenerationCache cache = generationCache.get();
- int currentGeneration = getMirror(context).updates();
- // The below code might race with other threads during a generation change. That is OK, as the cache
- // is thread safe and will quickly converge to a stable state for the new generation.
- if (cache == null || currentGeneration != cache.generation()) {
- cache = new GenerationCache(currentGeneration);
- generationCache.set(cache);
- }
- if (distributor != null) {
- return cachingGetTargetSpec(distributor, context, cache);
- }
- // Wildcard lookup case. Must not be cached.
- return super.getTargetSpec(null, context);
- }
-
- private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
- String cachedTarget = cache.get(distributor);
- if (cachedTarget != null) {
- return cachedTarget;
- }
- // Mirror _may_ be at a higher version if we race with generation read, but that is OK since
- // we'll either way get the most up-to-date mapping and the cache will be invalidated on the
- // next invocation.
- String resolvedTarget = super.getTargetSpec(distributor, context);
- cache.put(distributor, resolvedTarget);
- return resolvedTarget;
- }
-
- }
-
- /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
- public static class Parameters {
- protected final String clusterName;
- protected final String distributionConfigId;
- protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;
-
- public Parameters(Map<String, String> params) {
- clusterName = params.get("cluster");
- distributionConfigId = params.get("clusterconfigid");
- slobrokHostPatternGenerator = createPatternGenerator();
- if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set");
- }
-
- String getDistributionConfigId() {
- return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId);
- }
- public String getClusterName() {
- return clusterName;
- }
- public SlobrokHostPatternGenerator createPatternGenerator() {
- return new SlobrokHostPatternGenerator(getClusterName());
- }
- public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
- return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
- }
- public Distribution createDistribution(SlobrokPolicy policy) {
- return new Distribution(getDistributionConfigId());
- }
-
- /**
- * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the
- * failure was related to node being bad. (Hard to detect from failure)
- */
- int getAttemptRandomOnFailuresLimit() { return 5; }
-
- /**
- * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the
- * old one. This guards us against version resets.
- */
- int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; }
-
- /**
- * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random.
- * (To avoid hitting trashing bad nodes still in slobrok)
- */
- int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; }
- }
-
- /** Helper class to get the bucket identifier of a message. */
- public static class BucketIdCalculator {
- private static final BucketIdFactory factory = new BucketIdFactory();
-
- private BucketId getBucketId(Message msg) {
- switch (msg.getType()) {
- case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId());
- case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId());
- case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId());
- case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId());
- case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId();
- case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId();
- case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0);
- case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId();
- default:
- log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported.");
- return null;
- }
- }
-
- BucketId handleBucketIdCalculation(RoutingContext context) {
- BucketId id = getBucketId(context.getMessage());
- if (id == null || id.getRawId() == 0) {
- Reply reply = new EmptyReply();
- reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message."));
- context.setReply(reply);
- }
- return id;
- }
- }
-
- /** Class handling the logic of picking a distributor */
- public static class DistributorSelectionLogic {
- /** Class that tracks a failure of a given type per node. */
- static class InstabilityChecker {
- private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
- private final int failureLimit;
-
- InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; }
-
- boolean tooManyFailures(int nodeIndex) {
- if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
- nodeFailures.set(nodeIndex, 0);
- return true;
- } else {
- return false;
- }
- }
-
- void addFailure(Integer calculatedDistributor) {
- while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0);
- nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
- }
- }
- /** Message context class. Contains data we want to inspect about a request at reply time. */
- private static class MessageContext {
- final Integer calculatedDistributor;
- final ClusterState usedState;
-
- MessageContext() {
- this(null, null);
- }
- MessageContext(ClusterState usedState) {
- this(usedState, null);
- }
- MessageContext(ClusterState usedState, Integer calculatedDistributor) {
- this.calculatedDistributor = calculatedDistributor;
- this.usedState = usedState;
- }
-
- public String toString() {
- return "Context(Distributor " + calculatedDistributor +
- ", state version " + usedState.getVersion() + ")";
- }
- }
-
- private final HostFetcher hostFetcher;
- private final Distribution distribution;
- private final InstabilityChecker persistentFailureChecker;
- private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null);
- private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
- private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
-
- DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
- try {
- hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
- distribution = params.createDistribution(policy);
- persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
- maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
- } catch (Throwable e) {
- destroy();
- throw e;
- }
- }
-
- public void destroy() {
- if (hostFetcher != null) {
- hostFetcher.close();
- }
- if (distribution != null) {
- distribution.close();
- }
- }
-
- String getTargetSpec(RoutingContext context, BucketId bucketId) {
- String sendRandomReason = null;
- ClusterState cachedClusterState = safeCachedClusterState.get();
-
- if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
- try{
- Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
- // If we have had too many failures towards existing node, reset failure count and send to random
- if (persistentFailureChecker.tooManyFailures(target)) {
- sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
- target = null;
- }
- // If we have found a target, and the target exists in slobrok, send to it.
- if (target != null) {
- context.setContext(new MessageContext(cachedClusterState, target));
- String targetSpec = hostFetcher.getTargetSpec(target, context);
- if (targetSpec != null) {
- if (context.shouldTrace(1)) {
- context.trace(1, "Using distributor " + target + " for " +
- bucketId + " as our state version is " + cachedClusterState.getVersion());
- }
- return targetSpec;
- } else {
- sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
- log.log(Level.FINE, "Target distributor is not in slobrok");
- }
- } else {
- context.setContext(new MessageContext(cachedClusterState));
- }
- } catch (Distribution.TooFewBucketBitsInUseException e) {
- Reply reply = new WrongDistributionReply(cachedClusterState.toString(true));
- reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION,
- "Too few distribution bits used for given cluster state"));
- context.setReply(reply);
- return null;
- } catch (Distribution.NoDistributorsAvailableException e) {
- log.log(Level.FINE, "No distributors available; clearing cluster state");
- safeCachedClusterState.set(null);
- sendRandomReason = "No distributors available. Sending to random distributor.";
- context.setContext(createRandomDistributorTargetContext());
- }
- } else {
- context.setContext(createRandomDistributorTargetContext());
- sendRandomReason = "No cluster state cached. Sending to random distributor.";
- }
- if (context.shouldTrace(1)) {
- context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason");
- }
- return hostFetcher.getRandomTargetSpec(context);
- }
-
- private static MessageContext createRandomDistributorTargetContext() {
- return new MessageContext(null);
- }
-
- private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) {
- try {
- return Optional.of(new ClusterState(reply.getSystemState()));
- } catch (Exception e) {
- reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
- return Optional.empty();
- }
- }
-
- void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
- final MessageContext context = (MessageContext) routingContext.getContext();
- final Optional<ClusterState> replyState = clusterStateFromReply(reply);
- if (!replyState.isPresent()) {
- return;
- }
- final ClusterState newState = replyState.get();
- resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
- markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);
-
- if (context.calculatedDistributor == null) {
- traceReplyFromRandomDistributor(reply, newState);
- } else {
- traceReplyFromSpecificDistributor(reply, context, newState);
- }
- updateCachedRoutingStateFromWrongDistribution(context, newState);
- }
-
- private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
- ClusterState cachedClusterState = safeCachedClusterState.get();
- if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
- safeCachedClusterState.set(newState);
- if (newState.getClusterState().equals(State.UP)) {
- hostFetcher.updateValidTargets(newState);
- }
- } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
- safeCachedClusterState.set(null);
- } else if (context.calculatedDistributor != null) {
- persistentFailureChecker.addFailure(context.calculatedDistributor);
- }
- }
-
- private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
- if (context.usedState == null) {
- String msg = "Used state must be set as distributor is calculated. Bug.";
- reply.getTrace().trace(1, msg);
- log.log(Level.SEVERE, msg);
- } else if (newState.getVersion() == context.usedState.getVersion()) {
- String msg = "Message sent to distributor " + context.calculatedDistributor +
- " retrieved cluster state version " + newState.getVersion() +
- " which was the state we used to calculate distributor as target last time.";
- reply.getTrace().trace(1, msg);
- // Client load can be rejected towards distributors even with a matching cluster state version.
- // This usually happens during a node fail-over transition, where the target distributor will
- // reject an operation bound to a particular bucket if it does not own the bucket in _both_
- // the current and the next (transition target) state. Since it can happen during normal operation
- // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
- log.log(Level.FINE, msg);
- } else if (newState.getVersion() > context.usedState.getVersion()) {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
- " updated cluster state from version " + context.usedState.getVersion() +
- " to " + newState.getVersion());
- }
- } else {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
- " returned older cluster state version " + newState.getVersion());
- }
- }
- }
-
- private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
- ClusterState cachedClusterState = safeCachedClusterState.get();
- if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
- if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) {
- oldClusterVersionGottenCount.set(0);
- safeCachedClusterState.set(null);
- }
- }
- }
-
- private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
- if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
- if (reply.getRetryDelay() <= 0.0) {
- reply.setRetryDelay(-1);
- }
- } else {
- if (reply.getRetryDelay() <= 0.0) {
- reply.setRetryDelay(0);
- }
- }
- }
-
- private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
- if (!reply.getTrace().shouldTrace(1)) {
- return;
- }
- ClusterState cachedClusterState = safeCachedClusterState.get();
- if (cachedClusterState == null) {
- reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
- } else if (newState.getVersion() == cachedClusterState.getVersion()) {
- reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
- } else if (newState.getVersion() > cachedClusterState.getVersion()) {
- reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
- } else {
- reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
- }
- }
-
- void handleErrorReply(Reply reply, Object untypedContext) {
- MessageContext messageContext = (MessageContext) untypedContext;
- if (messageContext.calculatedDistributor != null) {
- persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Failed with " + messageContext.toString());
- }
- }
- }
- }
-
- private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
- private final DistributorSelectionLogic distributorSelectionLogic;
- private final Parameters parameters;
-
- /** Constructor used in production. */
- public StoragePolicy(String param) {
- this(parse(param));
- }
-
- public StoragePolicy(Map<String, String> params) {
- this(new Parameters(params));
- }
-
- /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
- public StoragePolicy(Parameters p) {
- super();
- parameters = p;
- distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
- }
-
- @Override
- public void select(RoutingContext context) {
- if (context.shouldTrace(1)) {
- context.trace(1, "Selecting route");
- }
-
- BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
- if (context.hasReply()) return;
-
- String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
- if (context.hasReply()) return;
- if (targetSpec != null) {
- Route route = new Route(context.getRoute());
- route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec)));
- context.addChild(route);
- } else {
- context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "Could not resolve any distributors to send to in cluster " + parameters.clusterName);
- }
- }
-
- @Override
- public void merge(RoutingContext context) {
- RoutingNodeIterator it = context.getChildIterator();
- Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply();
- if (reply == null) {
- reply = new EmptyReply();
- reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "No reply in any children, nor in the routing context: " + context));
- }
-
- if (reply instanceof WrongDistributionReply) {
- distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
- } else if (reply.hasErrors()) {
- distributorSelectionLogic.handleErrorReply(reply, context.getContext());
- } else if (reply instanceof WriteDocumentReply) {
- if (context.shouldTrace(9)) {
- context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
- }
- }
- context.setReply(reply);
- }
-
- @Override
- public void destroy() {
- distributorSelectionLogic.destroy();
- }
-}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
index cdc5878321a..52afcdfd77c 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
@@ -49,8 +49,8 @@ public class TargetCachingSlobrokHostFetcherTest {
static class Fixture {
SlobrokPolicy mockSlobrokPolicy = mock(SlobrokPolicy.class);
IMirror mockMirror = mock(IMirror.class);
- StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo");
- StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60);
+ ContentPolicy.SlobrokHostPatternGenerator patternGenerator = new ContentPolicy.SlobrokHostPatternGenerator("foo");
+ ContentPolicy.TargetCachingSlobrokHostFetcher hostFetcher = new ContentPolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60);
RoutingContext routingContext = mock(RoutingContext.class);
Fixture() {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java
index 8a6df061430..018697c0719 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java
@@ -15,7 +15,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-public class BasicTests extends StoragePolicyTestEnvironment {
+public class BasicTests extends ContentPolicyTestEnvironment {
/** Test that we can send a message through the policy. */
@Test
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java
index b0cea8ee819..f324245b612 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java
@@ -4,7 +4,7 @@ package com.yahoo.documentapi.messagebus.protocol.test.storagepolicy;
import org.junit.Ignore;
import org.junit.Test;
-public class StoragePolicyTest extends Simulator {
+public class ContentPolicyTest extends Simulator {
/**
* Verify that a resent message with failures doesn't ruin overall performance. (By dumping the cached state too often
* so other requests are sent to wrong target)
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java
index 00a045367bb..479e0b0f422 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java
@@ -10,7 +10,7 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolRoutingPolicy;
import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RoutingPolicyFactory;
-import com.yahoo.documentapi.messagebus.protocol.StoragePolicy;
+import com.yahoo.documentapi.messagebus.protocol.ContentPolicy;
import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import com.yahoo.documentapi.messagebus.protocol.test.PolicyTestFrame;
import com.yahoo.messagebus.EmptyReply;
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public abstract class StoragePolicyTestEnvironment {
+public abstract class ContentPolicyTestEnvironment {
protected StoragePolicyTestFactory policyFactory;
protected PolicyTestFrame frame;
@@ -102,7 +102,7 @@ public abstract class StoragePolicyTestEnvironment {
assertTrue(nodes.remove(second));
}
- public static class TestHostFetcher extends StoragePolicy.HostFetcher {
+ public static class TestHostFetcher extends ContentPolicy.HostFetcher {
private final String clusterName;
private RandomGen randomizer = new RandomGen(1234);
private final Set<Integer> nodes;
@@ -143,7 +143,7 @@ public abstract class StoragePolicyTestEnvironment {
}
}
- public static class TestParameters extends StoragePolicy.Parameters {
+ public static class TestParameters extends ContentPolicy.Parameters {
private final TestHostFetcher hostFetcher;
private final Distribution distribution;
@@ -154,7 +154,7 @@ public abstract class StoragePolicyTestEnvironment {
}
@Override
- public StoragePolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; }
+ public ContentPolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; }
@Override
public Distribution createDistribution(SlobrokPolicy policy) { return distribution; }
@@ -171,7 +171,7 @@ public abstract class StoragePolicyTestEnvironment {
public DocumentProtocolRoutingPolicy createPolicy(String parameters) {
parameterInstances.addLast(new TestParameters(parameters, nodes));
((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom);
- return new StoragePolicy(parameterInstances.getLast());
+ return new ContentPolicy(parameterInstances.getLast());
}
public void avoidPickingAtRandom(Integer distributor) {
avoidPickingAtRandom = distributor;
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
index 68405b002c8..d23dd9ea998 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
@@ -5,7 +5,7 @@ import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.document.DocumentId;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.documentapi.messagebus.protocol.StoragePolicy;
+import com.yahoo.documentapi.messagebus.protocol.ContentPolicy;
import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.vdslib.distribution.RandomGen;
import com.yahoo.vdslib.state.ClusterState;
@@ -21,7 +21,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public abstract class Simulator extends StoragePolicyTestEnvironment {
+public abstract class Simulator extends ContentPolicyTestEnvironment {
enum FailureType {
TRANSIENT_ERROR,
@@ -175,7 +175,7 @@ public abstract class Simulator extends StoragePolicyTestEnvironment {
for (int i=0; i<params.getParallellRequests(); ++i) {
RoutingNode target = targets[i];
int index = getAddress(target).getSecond();
- if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(StoragePolicy.owningBucketStates)) {
+ if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(ContentPolicy.owningBucketStates)) {
++downnode[half];
}
BadNode badNode = params.getBadNodes().get(index);