aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-01-05 14:50:46 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-01-05 14:50:46 +0100
commitbe5ea0ad39c15c13fb85a70d9990165499a92896 (patch)
treee92462f5f130fa68f40175ec7d987c661dd9ae0f /documentapi
parent6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff)
Revert "Revert "Jonmv/remove storage policy""
This reverts commit 75b2e4c11ea6463c335f1c77dab3fdb5493e5600.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/abi-spec.json174
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java2
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java602
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java9
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java615
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java4
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java2
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java)2
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java)12
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java6
-rw-r--r--documentapi/src/tests/policies/policies_test.cpp48
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp6
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt1
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp244
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h53
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp257
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h63
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp18
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h4
20 files changed, 991 insertions, 1133 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index a70f59bb9fb..39d6898215c 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -1519,30 +1519,92 @@
],
"fields": []
},
- "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$ContentParameters": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters",
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$BucketIdCalculator": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$DistributorSelectionLogic": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void destroy()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "abstract"
+ ],
+ "methods": [
+ "protected void <init>(int)",
+ "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)",
+ "public void close()"
+ ],
+ "fields": [
+ "protected final java.util.Random randomizer"
+ ]
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters": {
+ "superClass": "java.lang.Object",
"interfaces": [],
"attributes": [
"public"
],
"methods": [
"public void <init>(java.util.Map)",
- "public java.lang.String getDistributionConfigId()",
- "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()"
+ "public java.lang.String getClusterName()",
+ "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator createPatternGenerator()",
+ "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)",
+ "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)"
+ ],
+ "fields": [
+ "protected final java.lang.String clusterName",
+ "protected final java.lang.String distributionConfigId",
+ "protected final com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator"
+ ]
+ },
+ "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostFetcher": {
+ "superClass": "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)",
+ "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)"
],
"fields": []
},
"com.yahoo.documentapi.messagebus.protocol.ContentPolicy": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy",
+ "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy",
"interfaces": [],
"attributes": [
"public"
],
"methods": [
+ "public void <init>(java.lang.String)",
"public void <init>(java.util.Map)",
- "public void <init>(java.lang.String)"
+ "public void <init>(com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters)",
+ "public void select(com.yahoo.messagebus.routing.RoutingContext)",
+ "public void merge(com.yahoo.messagebus.routing.RoutingContext)",
+ "public void destroy()"
],
- "fields": []
+ "fields": [
+ "public static final java.lang.String owningBucketStates"
+ ]
},
"com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage": {
"superClass": "com.yahoo.documentapi.messagebus.protocol.DocumentMessage",
@@ -2979,104 +3041,6 @@
],
"fields": []
},
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$BucketIdCalculator": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>()"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$DistributorSelectionLogic": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void destroy()"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public",
- "abstract"
- ],
- "methods": [
- "protected void <init>(int)",
- "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)",
- "public void close()"
- ],
- "fields": [
- "protected final java.util.Random randomizer"
- ]
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>(java.util.Map)",
- "public java.lang.String getClusterName()",
- "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()",
- "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)",
- "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)"
- ],
- "fields": [
- "protected final java.lang.String clusterName",
- "protected final java.lang.String distributionConfigId",
- "protected final com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator"
- ]
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostFetcher": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)",
- "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public java.lang.String getDistributorHostPattern(java.lang.Integer)"
- ],
- "fields": []
- },
- "com.yahoo.documentapi.messagebus.protocol.StoragePolicy": {
- "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>(java.lang.String)",
- "public void <init>(java.util.Map)",
- "public void <init>(com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters)",
- "public void select(com.yahoo.messagebus.routing.RoutingContext)",
- "public void merge(com.yahoo.messagebus.routing.RoutingContext)",
- "public void destroy()"
- ],
- "fields": [
- "public static final java.lang.String owningBucketStates"
- ]
- },
"com.yahoo.documentapi.messagebus.protocol.SubsetServicePolicy": {
"superClass": "java.lang.Object",
"interfaces": [
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 257d491ea93..982a1c50b85 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -917,7 +917,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
}
if (isErrorOfType(reply, DocumentProtocol.ERROR_WRONG_DISTRIBUTION)) {
- handleWrongDistributionReply((WrongDistributionReply)reply);
+ handleWrongDistributionReply((WrongDistributionReply) reply);
} else {
if (shouldReportError(reply)) {
reportVisitorError(message);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
index d6e20b9d57f..2d78497456d 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
@@ -1,43 +1,613 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
+import com.yahoo.concurrent.CopyOnWriteHashMap;
+import com.yahoo.document.BucketId;
+import com.yahoo.document.BucketIdFactory;
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import java.util.logging.Level;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingContext;
+import com.yahoo.messagebus.routing.RoutingNodeIterator;
+import com.yahoo.messagebus.routing.VerbatimDirective;
+import com.yahoo.vdslib.distribution.Distribution;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.Node;
+import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vdslib.state.State;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
/**
- * Policy to talk to content clusters.
+ * Routing policy to determine which distributor in a content cluster to send data to.
+ * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to.
+ *
+ * cluster=[clusterName] (Mandatory, determines the cluster name)
+ * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application)
+ * clusterconfigid=[id] (Optional, use given config id for distribution instead of default)
+ *
+ * @author Haakon Humberset
*/
-public class ContentPolicy extends StoragePolicy {
+public class ContentPolicy extends SlobrokPolicy {
+
+ private static final Logger log = Logger.getLogger(ContentPolicy.class.getName());
+ public static final String owningBucketStates = "uim";
+ private static final String upStates = "ui";
+
+ /** This class merely generates a slobrok host pattern for a given distributor. */
+ static class SlobrokHostPatternGenerator {
+
+ private final String base;
+
+ SlobrokHostPatternGenerator(String clusterName) {
+ this.base = "storage/cluster." + clusterName + "/distributor/";
+ }
+
+ /**
+ * Find host pattern of the hosts that are valid targets for this request.
+ *
+ * @param distributor Set to null if any distributor is valid target.
+ */
+ String getDistributorHostPattern(Integer distributor) {
+ return base + (distributor == null ? "*" : distributor) + "/default";
+ }
+
+ }
+
+ /** Helper class to match a host pattern with node to use. */
+ public abstract static class HostFetcher {
+
+ private static class Targets {
+ private final List<Integer> list;
+ private final int total;
+ Targets() {
+ this(Collections.emptyList(), 1);
+ }
+ Targets(List<Integer> list, int total) {
+ this.list = list;
+ this.total = total;
+ }
+ }
+
+ private final int requiredUpPercentageToSendToKnownGoodNodes;
+ private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets());
+ protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy.
+
+ protected HostFetcher(int percent) {
+ requiredUpPercentageToSendToKnownGoodNodes = percent;
+ }
+
+ void updateValidTargets(ClusterState state) {
+ List<Integer> validRandomTargets = new ArrayList<>();
+ for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) {
+ if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i);
+ }
+ validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR)));
+ }
+ public abstract String getTargetSpec(Integer distributor, RoutingContext context);
+ String getRandomTargetSpec(RoutingContext context) {
+ Targets targets = validTargets.get();
+ // Try to use list of random targets, if at least X % of the nodes are up
+ while ((targets.total != 0) &&
+ (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes))
+ {
+ int randIndex = randomizer.nextInt(targets.list.size());
+ String targetSpec = getTargetSpec(targets.list.get(randIndex), context);
+ if (targetSpec != null) {
+ context.trace(3, "Sending to random node seen up in cluster state");
+ return targetSpec;
+ }
+ targets.list.remove(randIndex);
+ }
+ context.trace(3, "Too few nodes seen up in state. Sending totally random.");
+ return getTargetSpec(null, context);
+ }
+ public void close() {}
+ }
+
+ /** Host fetcher using a slobrok mirror to find the hosts. */
+ public static class SlobrokHostFetcher extends HostFetcher {
+ private final SlobrokHostPatternGenerator patternGenerator;
+ private final SlobrokPolicy policy;
- public static class ContentParameters extends Parameters {
+ SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
+ super(percent);
+ this.patternGenerator = patternGenerator;
+ this.policy = policy;
+ }
- public ContentParameters(Map<String, String> parameters) {
- super(parameters);
+ private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) {
+ return policy.lookup(context, hostPattern);
}
+ private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; }
+
+ public IMirror getMirror(RoutingContext context) { return context.getMirror(); }
+
@Override
- public String getDistributionConfigId() {
- if (distributionConfigId != null) {
- return distributionConfigId;
+ public String getTargetSpec(Integer distributor, RoutingContext context) {
+ List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context);
+ if (arr.isEmpty()) return null;
+ if (distributor != null) {
+ if (arr.size() == 1) {
+ return convertSlobrokNameToSessionName(arr.get(0).getSpecString());
+ } else {
+ log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor.");
+ }
+ } else {
+ return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString());
}
- return clusterName;
+ return null;
+ }
+ }
+
+ static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher {
+
+ /**
+ * Distributor index to resolved RPC spec cache for a single given Slobrok
+ * update generation. Uses a thread safe COW map which will grow until stable.
+ */
+ private static class GenerationCache {
+ private final int generation;
+ private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>();
+
+ GenerationCache(int generation) {
+ this.generation = generation;
+ }
+
+ public int generation() { return this.generation; }
+
+ public String get(Integer index) {
+ return targets.get(index);
+ }
+ public void put(Integer index, String target) {
+ targets.put(index, target);
+ }
+ }
+
+ private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);
+
+ TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
+ super(patternGenerator, policy, percent);
}
@Override
+ public String getTargetSpec(Integer distributor, RoutingContext context) {
+ GenerationCache cache = generationCache.get();
+ int currentGeneration = getMirror(context).updates();
+ // The below code might race with other threads during a generation change. That is OK, as the cache
+ // is thread safe and will quickly converge to a stable state for the new generation.
+ if (cache == null || currentGeneration != cache.generation()) {
+ cache = new GenerationCache(currentGeneration);
+ generationCache.set(cache);
+ }
+ if (distributor != null) {
+ return cachingGetTargetSpec(distributor, context, cache);
+ }
+ // Wildcard lookup case. Must not be cached.
+ return super.getTargetSpec(null, context);
+ }
+
+ private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
+ String cachedTarget = cache.get(distributor);
+ if (cachedTarget != null) {
+ return cachedTarget;
+ }
+ // Mirror _may_ be at a higher version if we race with generation read, but that is OK since
+ // we'll either way get the most up-to-date mapping and the cache will be invalidated on the
+ // next invocation.
+ String resolvedTarget = super.getTargetSpec(distributor, context);
+ cache.put(distributor, resolvedTarget);
+ return resolvedTarget;
+ }
+
+ }
+
+ /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
+ public static class Parameters {
+ protected final String clusterName;
+ protected final String distributionConfigId;
+ protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;
+
+ public Parameters(Map<String, String> params) {
+ clusterName = params.get("cluster");
+ distributionConfigId = params.get("clusterconfigid"); // TODO jonmv: remove
+ slobrokHostPatternGenerator = createPatternGenerator();
+ if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set");
+ }
+
+ String getDistributionConfigId() {
+ return distributionConfigId == null ? clusterName : distributionConfigId;
+ }
+ public String getClusterName() {
+ return clusterName;
+ }
public SlobrokHostPatternGenerator createPatternGenerator() {
- return new SlobrokHostPatternGenerator(getClusterName()) {
- public String getDistributorHostPattern(Integer distributor) {
- return "storage/cluster." + getClusterName() + "/distributor/" + (distributor == null ? "*" : distributor) + "/default";
+ return new SlobrokHostPatternGenerator(getClusterName());
+ }
+ public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
+ return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
+ }
+ public Distribution createDistribution(SlobrokPolicy policy) {
+ return new Distribution(getDistributionConfigId());
+ }
+
+ /**
+ * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the
+ * failure was related to node being bad. (Hard to detect from failure)
+ */
+ int getAttemptRandomOnFailuresLimit() { return 5; }
+
+ /**
+ * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the
+ * old one. This guards us against version resets.
+ */
+ int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; }
+
+ /**
+ * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random.
+ * (To avoid hitting trashing bad nodes still in slobrok)
+ */
+ int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; }
+ }
+
+ /** Helper class to get the bucket identifier of a message. */
+ public static class BucketIdCalculator {
+ private static final BucketIdFactory factory = new BucketIdFactory();
+
+ private BucketId getBucketId(Message msg) {
+ switch (msg.getType()) {
+ case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId());
+ case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId());
+ case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId());
+ case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId());
+ case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId();
+ case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId();
+ case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0);
+ case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId();
+ default:
+ log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported.");
+ return null;
+ }
+ }
+
+ BucketId handleBucketIdCalculation(RoutingContext context) {
+ BucketId id = getBucketId(context.getMessage());
+ if (id == null || id.getRawId() == 0) {
+ Reply reply = new EmptyReply();
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message."));
+ context.setReply(reply);
+ }
+ return id;
+ }
+ }
+
+ /** Class handling the logic of picking a distributor */
+ public static class DistributorSelectionLogic {
+ /** Class that tracks a failure of a given type per node. */
+ static class InstabilityChecker {
+ private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
+ private final int failureLimit;
+
+ InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; }
+
+ boolean tooManyFailures(int nodeIndex) {
+ if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
+ nodeFailures.set(nodeIndex, 0);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void addFailure(Integer calculatedDistributor) {
+ while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0);
+ nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
+ }
+ }
+ /** Message context class. Contains data we want to inspect about a request at reply time. */
+ private static class MessageContext {
+ final Integer calculatedDistributor;
+ final ClusterState usedState;
+
+ MessageContext(ClusterState usedState) {
+ this(usedState, null);
+ }
+ MessageContext(ClusterState usedState, Integer calculatedDistributor) {
+ this.calculatedDistributor = calculatedDistributor;
+ this.usedState = usedState;
+ }
+
+ public String toString() {
+ return "Context(Distributor " + calculatedDistributor +
+ ", state version " + usedState.getVersion() + ")";
+ }
+ }
+
+ private final HostFetcher hostFetcher;
+ private final Distribution distribution;
+ private final InstabilityChecker persistentFailureChecker;
+ private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null);
+ private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
+ private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
+
+ DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
+ try {
+ hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
+ distribution = params.createDistribution(policy);
+ persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
+ maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
+ } catch (Throwable e) {
+ destroy();
+ throw e;
+ }
+ }
+
+ public void destroy() {
+ if (hostFetcher != null) {
+ hostFetcher.close();
+ }
+ if (distribution != null) {
+ distribution.close();
+ }
+ }
+
+ String getTargetSpec(RoutingContext context, BucketId bucketId) {
+ String sendRandomReason = null;
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+
+ if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
+ try{
+ Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
+ // If we have had too many failures towards existing node, reset failure count and send to random
+ if (persistentFailureChecker.tooManyFailures(target)) {
+ sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
+ target = null;
+ }
+ // If we have found a target, and the target exists in slobrok, send to it.
+ if (target != null) {
+ context.setContext(new MessageContext(cachedClusterState, target));
+ String targetSpec = hostFetcher.getTargetSpec(target, context);
+ if (targetSpec != null) {
+ if (context.shouldTrace(1)) {
+ context.trace(1, "Using distributor " + target + " for " +
+ bucketId + " as our state version is " + cachedClusterState.getVersion());
+ }
+ return targetSpec;
+ } else {
+ sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
+ log.log(Level.FINE, "Target distributor is not in slobrok");
+ }
+ } else {
+ context.setContext(new MessageContext(cachedClusterState));
+ }
+ } catch (Distribution.TooFewBucketBitsInUseException e) {
+ Reply reply = new WrongDistributionReply(cachedClusterState.toString(true));
+ reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION,
+ "Too few distribution bits used for given cluster state"));
+ context.setReply(reply);
+ return null;
+ } catch (Distribution.NoDistributorsAvailableException e) {
+ log.log(Level.FINE, "No distributors available; clearing cluster state");
+ safeCachedClusterState.set(null);
+ sendRandomReason = "No distributors available. Sending to random distributor.";
+ context.setContext(createRandomDistributorTargetContext());
+ }
+ } else {
+ context.setContext(createRandomDistributorTargetContext());
+ sendRandomReason = "No cluster state cached. Sending to random distributor.";
+ }
+ if (context.shouldTrace(1)) {
+ context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason");
+ }
+ return hostFetcher.getRandomTargetSpec(context);
+ }
+
+ private static MessageContext createRandomDistributorTargetContext() {
+ return new MessageContext(null);
+ }
+
+ private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) {
+ try {
+ return Optional.of(new ClusterState(reply.getSystemState()));
+ } catch (Exception e) {
+ reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
+ return Optional.empty();
+ }
+ }
+
+ void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
+ final MessageContext context = (MessageContext) routingContext.getContext();
+ final Optional<ClusterState> replyState = clusterStateFromReply(reply);
+ if (!replyState.isPresent()) {
+ return;
+ }
+ final ClusterState newState = replyState.get();
+ resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
+ markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);
+
+ if (context.calculatedDistributor == null) {
+ traceReplyFromRandomDistributor(reply, newState);
+ } else {
+ traceReplyFromSpecificDistributor(reply, context, newState);
+ }
+ updateCachedRoutingStateFromWrongDistribution(context, newState);
+ }
+
+ private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
+ safeCachedClusterState.set(newState);
+ if (newState.getClusterState().equals(State.UP)) {
+ hostFetcher.updateValidTargets(newState);
+ }
+ } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
+ safeCachedClusterState.set(null);
+ } else if (context.calculatedDistributor != null) {
+ persistentFailureChecker.addFailure(context.calculatedDistributor);
+ }
+ }
+
+ private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
+ if (context.usedState == null) {
+ String msg = "Used state must be set as distributor is calculated. Bug.";
+ reply.getTrace().trace(1, msg);
+ log.log(Level.SEVERE, msg);
+ } else if (newState.getVersion() == context.usedState.getVersion()) {
+ String msg = "Message sent to distributor " + context.calculatedDistributor +
+ " retrieved cluster state version " + newState.getVersion() +
+ " which was the state we used to calculate distributor as target last time.";
+ reply.getTrace().trace(1, msg);
+ // Client load can be rejected towards distributors even with a matching cluster state version.
+ // This usually happens during a node fail-over transition, where the target distributor will
+ // reject an operation bound to a particular bucket if it does not own the bucket in _both_
+ // the current and the next (transition target) state. Since it can happen during normal operation
+ // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
+ log.log(Level.FINE, msg);
+ } else if (newState.getVersion() > context.usedState.getVersion()) {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " updated cluster state from version " + context.usedState.getVersion() +
+ " to " + newState.getVersion());
+ }
+ } else {
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
+ " returned older cluster state version " + newState.getVersion());
+ }
+ }
+ }
+
+ private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
+ if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) {
+ oldClusterVersionGottenCount.set(0);
+ safeCachedClusterState.set(null);
+ }
+ }
+ }
+
+ private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
+ if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
+ if (reply.getRetryDelay() <= 0.0) {
+ reply.setRetryDelay(-1);
+ }
+ } else {
+ if (reply.getRetryDelay() <= 0.0) {
+ reply.setRetryDelay(0);
+ }
+ }
+ }
+
+ private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
+ if (!reply.getTrace().shouldTrace(1)) {
+ return;
+ }
+ ClusterState cachedClusterState = safeCachedClusterState.get();
+ if (cachedClusterState == null) {
+ reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
+ } else if (newState.getVersion() == cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
+ } else if (newState.getVersion() > cachedClusterState.getVersion()) {
+ reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
+ } else {
+ reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
+ }
+ }
+
+ void handleErrorReply(Reply reply, Object untypedContext) {
+ MessageContext messageContext = (MessageContext) untypedContext;
+ if (messageContext.calculatedDistributor != null) {
+ persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
+ if (reply.getTrace().shouldTrace(1)) {
+ reply.getTrace().trace(1, "Failed with " + messageContext.toString());
}
- };
+ }
}
}
+ private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
+ private final DistributorSelectionLogic distributorSelectionLogic;
+ private final Parameters parameters;
+
+ /** Constructor used in production. */
+ public ContentPolicy(String param) {
+ this(parse(param));
+ }
+
public ContentPolicy(Map<String, String> params) {
- super(new ContentParameters(params));
+ this(new Parameters(params));
}
- public ContentPolicy(String parameters) {
- this(parse(parameters));
+ /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
+ public ContentPolicy(Parameters p) {
+ super();
+ parameters = p;
+ distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
}
+ @Override
+ public void select(RoutingContext context) {
+ if (context.shouldTrace(1)) {
+ context.trace(1, "Selecting route");
+ }
+
+ BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
+ if (context.hasReply()) return;
+
+ String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
+ if (context.hasReply()) return;
+ if (targetSpec != null) {
+ Route route = new Route(context.getRoute());
+ route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec)));
+ context.addChild(route);
+ } else {
+ context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
+ "Could not resolve any distributors to send to in cluster " + parameters.clusterName);
+ }
+ }
+
+ @Override
+ public void merge(RoutingContext context) {
+ RoutingNodeIterator it = context.getChildIterator();
+ Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply();
+ if (reply == null) {
+ reply = new EmptyReply();
+ reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
+ "No reply in any children, nor in the routing context: " + context));
+ }
+
+ if (reply instanceof WrongDistributionReply) {
+ distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
+ } else if (reply.hasErrors()) {
+ distributorSelectionLogic.handleErrorReply(reply, context.getContext());
+ } else if (reply instanceof WriteDocumentReply) {
+ if (context.shouldTrace(9)) {
+ context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
+ }
+ }
+ context.setReply(reply);
+ }
+
+ @Override
+ public void destroy() {
+ distributorSelectionLogic.destroy();
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index ca32c5722e6..f5b4920fa3f 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -263,7 +263,7 @@ public class DocumentProtocol implements Protocol {
putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg));
putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory());
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
- putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.StoragePolicyFactory());
+ putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.ContentPolicyFactory());
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
// Prepare version specifications to use when adding routable factories.
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
index 6954d8f3a1d..7b44a1a4f0d 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
@@ -16,15 +16,6 @@ public abstract class RoutingPolicyFactories {
}
}
- static class StoragePolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new StoragePolicy(param);
- }
-
- public void destroy() {
- }
- }
-
static class ContentPolicyFactory implements RoutingPolicyFactory {
public DocumentProtocolRoutingPolicy createPolicy(String param) {
return new ContentPolicy(param);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
deleted file mode 100644
index b74f7431531..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ /dev/null
@@ -1,615 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.concurrent.CopyOnWriteHashMap;
-import com.yahoo.document.BucketId;
-import com.yahoo.document.BucketIdFactory;
-import com.yahoo.jrt.slobrok.api.IMirror;
-import com.yahoo.jrt.slobrok.api.Mirror;
-import java.util.logging.Level;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.routing.Hop;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.messagebus.routing.RoutingNodeIterator;
-import com.yahoo.messagebus.routing.VerbatimDirective;
-import com.yahoo.vdslib.distribution.Distribution;
-import com.yahoo.vdslib.state.ClusterState;
-import com.yahoo.vdslib.state.Node;
-import com.yahoo.vdslib.state.NodeType;
-import com.yahoo.vdslib.state.State;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-/**
- * Routing policy to determine which distributor in a storage cluster to send data to.
- * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to.
- *
- * cluster=[clusterName] (Mandatory, determines the cluster name)
- * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application)
- * clusterconfigid=[id] (Optional, use given config id for distribution instead of default)
- *
- * @author Haakon Humberset
- */
-public class StoragePolicy extends SlobrokPolicy {
-
- private static final Logger log = Logger.getLogger(StoragePolicy.class.getName());
- public static final String owningBucketStates = "uim";
- private static final String upStates = "ui";
-
- /** This class merely generates slobrok a host pattern for a given distributor. */
- public static class SlobrokHostPatternGenerator {
- private final String base;
- private final String all;
- SlobrokHostPatternGenerator(String clusterName) {
- base = "storage/cluster." + clusterName + "/distributor/";
- all = base + "*/default";
-
- }
-
- /**
- * Find host pattern of the hosts that are valid targets for this request.
- * @param distributor Set to -1 if any distributor is valid target.
- */
- public String getDistributorHostPattern(Integer distributor) {
- return (distributor == null) ? all : (base + distributor + "/default");
- }
- }
-
- /** Helper class to match a host pattern with node to use. */
- public abstract static class HostFetcher {
-
- private static class Targets {
- private final List<Integer> list;
- private final int total;
- Targets() {
- this(Collections.emptyList(), 1);
- }
- Targets(List<Integer> list, int total) {
- this.list = list;
- this.total = total;
- }
- }
-
- private final int requiredUpPercentageToSendToKnownGoodNodes;
- private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets());
- protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy.
-
- protected HostFetcher(int percent) {
- requiredUpPercentageToSendToKnownGoodNodes = percent;
- }
-
- void updateValidTargets(ClusterState state) {
- List<Integer> validRandomTargets = new ArrayList<>();
- for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) {
- if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i);
- }
- validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR)));
- }
- public abstract String getTargetSpec(Integer distributor, RoutingContext context);
- String getRandomTargetSpec(RoutingContext context) {
- Targets targets = validTargets.get();
- // Try to use list of random targets, if at least X % of the nodes are up
- while ((targets.total != 0) &&
- (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes))
- {
- int randIndex = randomizer.nextInt(targets.list.size());
- String targetSpec = getTargetSpec(targets.list.get(randIndex), context);
- if (targetSpec != null) {
- context.trace(3, "Sending to random node seen up in cluster state");
- return targetSpec;
- }
- targets.list.remove(randIndex);
- }
- context.trace(3, "Too few nodes seen up in state. Sending totally random.");
- return getTargetSpec(null, context);
- }
- public void close() {}
- }
-
- /** Host fetcher using a slobrok mirror to find the hosts. */
- public static class SlobrokHostFetcher extends HostFetcher {
- private final SlobrokHostPatternGenerator patternGenerator;
- private final SlobrokPolicy policy;
-
- SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
- super(percent);
- this.patternGenerator = patternGenerator;
- this.policy = policy;
- }
-
- private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) {
- return policy.lookup(context, hostPattern);
- }
-
- private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; }
-
- public IMirror getMirror(RoutingContext context) { return context.getMirror(); }
-
- @Override
- public String getTargetSpec(Integer distributor, RoutingContext context) {
- List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context);
- if (arr.isEmpty()) return null;
- if (distributor != null) {
- if (arr.size() == 1) {
- return convertSlobrokNameToSessionName(arr.get(0).getSpecString());
- } else {
- log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor.");
- }
- } else {
- return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString());
- }
- return null;
- }
- }
-
- static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher {
-
- /**
- * Distributor index to resolved RPC spec cache for a single given Slobrok
- * update generation. Uses a thread safe COW map which will grow until stable.
- */
- private static class GenerationCache {
- private final int generation;
- private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>();
-
- GenerationCache(int generation) {
- this.generation = generation;
- }
-
- public int generation() { return this.generation; }
-
- public String get(Integer index) {
- return targets.get(index);
- }
- public void put(Integer index, String target) {
- targets.put(index, target);
- }
- }
-
- private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);
-
- TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
- super(patternGenerator, policy, percent);
- }
-
- @Override
- public String getTargetSpec(Integer distributor, RoutingContext context) {
- GenerationCache cache = generationCache.get();
- int currentGeneration = getMirror(context).updates();
- // The below code might race with other threads during a generation change. That is OK, as the cache
- // is thread safe and will quickly converge to a stable state for the new generation.
- if (cache == null || currentGeneration != cache.generation()) {
- cache = new GenerationCache(currentGeneration);
- generationCache.set(cache);
- }
- if (distributor != null) {
- return cachingGetTargetSpec(distributor, context, cache);
- }
- // Wildcard lookup case. Must not be cached.
- return super.getTargetSpec(null, context);
- }
-
- private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
- String cachedTarget = cache.get(distributor);
- if (cachedTarget != null) {
- return cachedTarget;
- }
- // Mirror _may_ be at a higher version if we race with generation read, but that is OK since
- // we'll either way get the most up-to-date mapping and the cache will be invalidated on the
- // next invocation.
- String resolvedTarget = super.getTargetSpec(distributor, context);
- cache.put(distributor, resolvedTarget);
- return resolvedTarget;
- }
-
- }
-
- /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
- public static class Parameters {
- protected final String clusterName;
- protected final String distributionConfigId;
- protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;
-
- public Parameters(Map<String, String> params) {
- clusterName = params.get("cluster");
- distributionConfigId = params.get("clusterconfigid");
- slobrokHostPatternGenerator = createPatternGenerator();
- if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set");
- }
-
- String getDistributionConfigId() {
- return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId);
- }
- public String getClusterName() {
- return clusterName;
- }
- public SlobrokHostPatternGenerator createPatternGenerator() {
- return new SlobrokHostPatternGenerator(getClusterName());
- }
- public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
- return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
- }
- public Distribution createDistribution(SlobrokPolicy policy) {
- return new Distribution(getDistributionConfigId());
- }
-
- /**
- * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the
- * failure was related to node being bad. (Hard to detect from failure)
- */
- int getAttemptRandomOnFailuresLimit() { return 5; }
-
- /**
- * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the
- * old one. This guards us against version resets.
- */
- int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; }
-
- /**
- * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random.
- * (To avoid hitting trashing bad nodes still in slobrok)
- */
- int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; }
- }
-
- /** Helper class to get the bucket identifier of a message. */
- public static class BucketIdCalculator {
- private static final BucketIdFactory factory = new BucketIdFactory();
-
- private BucketId getBucketId(Message msg) {
- switch (msg.getType()) {
- case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId());
- case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId());
- case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId());
- case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId());
- case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId();
- case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId();
- case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0);
- case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId();
- default:
- log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported.");
- return null;
- }
- }
-
- BucketId handleBucketIdCalculation(RoutingContext context) {
- BucketId id = getBucketId(context.getMessage());
- if (id == null || id.getRawId() == 0) {
- Reply reply = new EmptyReply();
- reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message."));
- context.setReply(reply);
- }
- return id;
- }
- }
-
- /** Class handling the logic of picking a distributor */
- public static class DistributorSelectionLogic {
- /** Class that tracks a failure of a given type per node. */
- static class InstabilityChecker {
- private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
- private final int failureLimit;
-
- InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; }
-
- boolean tooManyFailures(int nodeIndex) {
- if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
- nodeFailures.set(nodeIndex, 0);
- return true;
- } else {
- return false;
- }
- }
-
- void addFailure(Integer calculatedDistributor) {
- while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0);
- nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
- }
- }
- /** Message context class. Contains data we want to inspect about a request at reply time. */
- private static class MessageContext {
- final Integer calculatedDistributor;
- final ClusterState usedState;
-
- MessageContext() {
- this(null, null);
- }
- MessageContext(ClusterState usedState) {
- this(usedState, null);
- }
- MessageContext(ClusterState usedState, Integer calculatedDistributor) {
- this.calculatedDistributor = calculatedDistributor;
- this.usedState = usedState;
- }
-
- public String toString() {
- return "Context(Distributor " + calculatedDistributor +
- ", state version " + usedState.getVersion() + ")";
- }
- }
-
- private final HostFetcher hostFetcher;
- private final Distribution distribution;
- private final InstabilityChecker persistentFailureChecker;
- private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null);
- private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
- private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
-
- DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
- try {
- hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
- distribution = params.createDistribution(policy);
- persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
- maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
- } catch (Throwable e) {
- destroy();
- throw e;
- }
- }
-
- public void destroy() {
- if (hostFetcher != null) {
- hostFetcher.close();
- }
- if (distribution != null) {
- distribution.close();
- }
- }
-
- String getTargetSpec(RoutingContext context, BucketId bucketId) {
- String sendRandomReason = null;
- ClusterState cachedClusterState = safeCachedClusterState.get();
-
- if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
- try{
- Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
- // If we have had too many failures towards existing node, reset failure count and send to random
- if (persistentFailureChecker.tooManyFailures(target)) {
- sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
- target = null;
- }
- // If we have found a target, and the target exists in slobrok, send to it.
- if (target != null) {
- context.setContext(new MessageContext(cachedClusterState, target));
- String targetSpec = hostFetcher.getTargetSpec(target, context);
- if (targetSpec != null) {
- if (context.shouldTrace(1)) {
- context.trace(1, "Using distributor " + target + " for " +
- bucketId + " as our state version is " + cachedClusterState.getVersion());
- }
- return targetSpec;
- } else {
- sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
- log.log(Level.FINE, "Target distributor is not in slobrok");
- }
- } else {
- context.setContext(new MessageContext(cachedClusterState));
- }
- } catch (Distribution.TooFewBucketBitsInUseException e) {
- Reply reply = new WrongDistributionReply(cachedClusterState.toString(true));
- reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION,
- "Too few distribution bits used for given cluster state"));
- context.setReply(reply);
- return null;
- } catch (Distribution.NoDistributorsAvailableException e) {
- log.log(Level.FINE, "No distributors available; clearing cluster state");
- safeCachedClusterState.set(null);
- sendRandomReason = "No distributors available. Sending to random distributor.";
- context.setContext(createRandomDistributorTargetContext());
- }
- } else {
- context.setContext(createRandomDistributorTargetContext());
- sendRandomReason = "No cluster state cached. Sending to random distributor.";
- }
- if (context.shouldTrace(1)) {
- context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason");
- }
- return hostFetcher.getRandomTargetSpec(context);
- }
-
- private static MessageContext createRandomDistributorTargetContext() {
- return new MessageContext(null);
- }
-
- private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) {
- try {
- return Optional.of(new ClusterState(reply.getSystemState()));
- } catch (Exception e) {
- reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
- return Optional.empty();
- }
- }
-
- void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
- final MessageContext context = (MessageContext) routingContext.getContext();
- final Optional<ClusterState> replyState = clusterStateFromReply(reply);
- if (!replyState.isPresent()) {
- return;
- }
- final ClusterState newState = replyState.get();
- resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
- markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);
-
- if (context.calculatedDistributor == null) {
- traceReplyFromRandomDistributor(reply, newState);
- } else {
- traceReplyFromSpecificDistributor(reply, context, newState);
- }
- updateCachedRoutingStateFromWrongDistribution(context, newState);
- }
-
- private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
- ClusterState cachedClusterState = safeCachedClusterState.get();
- if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
- safeCachedClusterState.set(newState);
- if (newState.getClusterState().equals(State.UP)) {
- hostFetcher.updateValidTargets(newState);
- }
- } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
- safeCachedClusterState.set(null);
- } else if (context.calculatedDistributor != null) {
- persistentFailureChecker.addFailure(context.calculatedDistributor);
- }
- }
-
- private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
- if (context.usedState == null) {
- String msg = "Used state must be set as distributor is calculated. Bug.";
- reply.getTrace().trace(1, msg);
- log.log(Level.SEVERE, msg);
- } else if (newState.getVersion() == context.usedState.getVersion()) {
- String msg = "Message sent to distributor " + context.calculatedDistributor +
- " retrieved cluster state version " + newState.getVersion() +
- " which was the state we used to calculate distributor as target last time.";
- reply.getTrace().trace(1, msg);
- // Client load can be rejected towards distributors even with a matching cluster state version.
- // This usually happens during a node fail-over transition, where the target distributor will
- // reject an operation bound to a particular bucket if it does not own the bucket in _both_
- // the current and the next (transition target) state. Since it can happen during normal operation
- // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
- log.log(Level.FINE, msg);
- } else if (newState.getVersion() > context.usedState.getVersion()) {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
- " updated cluster state from version " + context.usedState.getVersion() +
- " to " + newState.getVersion());
- }
- } else {
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
- " returned older cluster state version " + newState.getVersion());
- }
- }
- }
-
- private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
- ClusterState cachedClusterState = safeCachedClusterState.get();
- if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
- if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) {
- oldClusterVersionGottenCount.set(0);
- safeCachedClusterState.set(null);
- }
- }
- }
-
- private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
- if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
- if (reply.getRetryDelay() <= 0.0) {
- reply.setRetryDelay(-1);
- }
- } else {
- if (reply.getRetryDelay() <= 0.0) {
- reply.setRetryDelay(0);
- }
- }
- }
-
- private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
- if (!reply.getTrace().shouldTrace(1)) {
- return;
- }
- ClusterState cachedClusterState = safeCachedClusterState.get();
- if (cachedClusterState == null) {
- reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
- } else if (newState.getVersion() == cachedClusterState.getVersion()) {
- reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
- } else if (newState.getVersion() > cachedClusterState.getVersion()) {
- reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
- } else {
- reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
- }
- }
-
- void handleErrorReply(Reply reply, Object untypedContext) {
- MessageContext messageContext = (MessageContext) untypedContext;
- if (messageContext.calculatedDistributor != null) {
- persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
- if (reply.getTrace().shouldTrace(1)) {
- reply.getTrace().trace(1, "Failed with " + messageContext.toString());
- }
- }
- }
- }
-
- private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
- private final DistributorSelectionLogic distributorSelectionLogic;
- private final Parameters parameters;
-
- /** Constructor used in production. */
- public StoragePolicy(String param) {
- this(parse(param));
- }
-
- public StoragePolicy(Map<String, String> params) {
- this(new Parameters(params));
- }
-
- /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
- public StoragePolicy(Parameters p) {
- super();
- parameters = p;
- distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
- }
-
- @Override
- public void select(RoutingContext context) {
- if (context.shouldTrace(1)) {
- context.trace(1, "Selecting route");
- }
-
- BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
- if (context.hasReply()) return;
-
- String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
- if (context.hasReply()) return;
- if (targetSpec != null) {
- Route route = new Route(context.getRoute());
- route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec)));
- context.addChild(route);
- } else {
- context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "Could not resolve any distributors to send to in cluster " + parameters.clusterName);
- }
- }
-
- @Override
- public void merge(RoutingContext context) {
- RoutingNodeIterator it = context.getChildIterator();
- Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply();
- if (reply == null) {
- reply = new EmptyReply();
- reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "No reply in any children, nor in the routing context: " + context));
- }
-
- if (reply instanceof WrongDistributionReply) {
- distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
- } else if (reply.hasErrors()) {
- distributorSelectionLogic.handleErrorReply(reply, context.getContext());
- } else if (reply instanceof WriteDocumentReply) {
- if (context.shouldTrace(9)) {
- context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
- }
- }
- context.setReply(reply);
- }
-
- @Override
- public void destroy() {
- distributorSelectionLogic.destroy();
- }
-}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
index cdc5878321a..52afcdfd77c 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
@@ -49,8 +49,8 @@ public class TargetCachingSlobrokHostFetcherTest {
static class Fixture {
SlobrokPolicy mockSlobrokPolicy = mock(SlobrokPolicy.class);
IMirror mockMirror = mock(IMirror.class);
- StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo");
- StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60);
+ ContentPolicy.SlobrokHostPatternGenerator patternGenerator = new ContentPolicy.SlobrokHostPatternGenerator("foo");
+ ContentPolicy.TargetCachingSlobrokHostFetcher hostFetcher = new ContentPolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60);
RoutingContext routingContext = mock(RoutingContext.class);
Fixture() {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java
index 8a6df061430..018697c0719 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java
@@ -15,7 +15,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-public class BasicTests extends StoragePolicyTestEnvironment {
+public class BasicTests extends ContentPolicyTestEnvironment {
/** Test that we can send a message through the policy. */
@Test
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java
index b0cea8ee819..f324245b612 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java
@@ -4,7 +4,7 @@ package com.yahoo.documentapi.messagebus.protocol.test.storagepolicy;
import org.junit.Ignore;
import org.junit.Test;
-public class StoragePolicyTest extends Simulator {
+public class ContentPolicyTest extends Simulator {
/**
* Verify that a resent message with failures doesn't ruin overall performance. (By dumping the cached state too often
* so other requests are sent to wrong target)
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java
index 00a045367bb..479e0b0f422 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java
@@ -10,7 +10,7 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolRoutingPolicy;
import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RoutingPolicyFactory;
-import com.yahoo.documentapi.messagebus.protocol.StoragePolicy;
+import com.yahoo.documentapi.messagebus.protocol.ContentPolicy;
import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import com.yahoo.documentapi.messagebus.protocol.test.PolicyTestFrame;
import com.yahoo.messagebus.EmptyReply;
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public abstract class StoragePolicyTestEnvironment {
+public abstract class ContentPolicyTestEnvironment {
protected StoragePolicyTestFactory policyFactory;
protected PolicyTestFrame frame;
@@ -102,7 +102,7 @@ public abstract class StoragePolicyTestEnvironment {
assertTrue(nodes.remove(second));
}
- public static class TestHostFetcher extends StoragePolicy.HostFetcher {
+ public static class TestHostFetcher extends ContentPolicy.HostFetcher {
private final String clusterName;
private RandomGen randomizer = new RandomGen(1234);
private final Set<Integer> nodes;
@@ -143,7 +143,7 @@ public abstract class StoragePolicyTestEnvironment {
}
}
- public static class TestParameters extends StoragePolicy.Parameters {
+ public static class TestParameters extends ContentPolicy.Parameters {
private final TestHostFetcher hostFetcher;
private final Distribution distribution;
@@ -154,7 +154,7 @@ public abstract class StoragePolicyTestEnvironment {
}
@Override
- public StoragePolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; }
+ public ContentPolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; }
@Override
public Distribution createDistribution(SlobrokPolicy policy) { return distribution; }
@@ -171,7 +171,7 @@ public abstract class StoragePolicyTestEnvironment {
public DocumentProtocolRoutingPolicy createPolicy(String parameters) {
parameterInstances.addLast(new TestParameters(parameters, nodes));
((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom);
- return new StoragePolicy(parameterInstances.getLast());
+ return new ContentPolicy(parameterInstances.getLast());
}
public void avoidPickingAtRandom(Integer distributor) {
avoidPickingAtRandom = distributor;
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
index 68405b002c8..d23dd9ea998 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
@@ -5,7 +5,7 @@ import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.document.DocumentId;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.documentapi.messagebus.protocol.StoragePolicy;
+import com.yahoo.documentapi.messagebus.protocol.ContentPolicy;
import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.vdslib.distribution.RandomGen;
import com.yahoo.vdslib.state.ClusterState;
@@ -21,7 +21,7 @@ import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public abstract class Simulator extends StoragePolicyTestEnvironment {
+public abstract class Simulator extends ContentPolicyTestEnvironment {
enum FailureType {
TRANSIENT_ERROR,
@@ -175,7 +175,7 @@ public abstract class Simulator extends StoragePolicyTestEnvironment {
for (int i=0; i<params.getParallellRequests(); ++i) {
RoutingNode target = targets[i];
int index = getAddress(target).getSecond();
- if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(StoragePolicy.owningBucketStates)) {
+ if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(ContentPolicy.owningBucketStates)) {
++downnode[half];
}
BadNode badNode = params.getBadNodes().get(index);
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp
index 0c659f589d6..02bd6b297d0 100644
--- a/documentapi/src/tests/policies/policies_test.cpp
+++ b/documentapi/src/tests/policies/policies_test.cpp
@@ -4,13 +4,13 @@
#include <vespa/documentapi/documentapi.h>
#include <vespa/documentapi/messagebus/policies/andpolicy.h>
+#include <vespa/documentapi/messagebus/policies/contentpolicy.h>
#include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h>
#include <vespa/documentapi/messagebus/policies/errorpolicy.h>
#include <vespa/documentapi/messagebus/policies/externpolicy.h>
#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h>
#include <vespa/documentapi/messagebus/policies/localservicepolicy.h>
#include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h>
-#include <vespa/documentapi/messagebus/policies/storagepolicy.h>
#include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/routing/routingnode.h>
@@ -51,7 +51,7 @@ private:
private:
bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected);
void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1);
- StoragePolicy &setupStoragePolicy(TestFrame &frame, const string &param,
+ ContentPolicy &setupContentPolicy(TestFrame &frame, const string &param,
const string &pattern = "", int32_t numEntries = -1);
bool isErrorPolicy(const string &name, const string &param);
void assertMirrorReady(const IMirrorAPI &mirror);
@@ -83,10 +83,10 @@ public:
void requireThatExternPolicyWithUnknownPatternSelectsNone();
void requireThatExternPolicySelectsFromExternSlobrok();
void requireThatExternPolicyMergesOneReplyAsProtocol();
- void requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy();
- void requireThatStoragePolicyIsRandomWithoutState();
- void requireThatStoragePolicyIsTargetedWithState();
- void requireThatStoragePolicyCombinesSystemAndSlobrokState();
+ void requireThatContentPolicyWithIllegalParamIsAnErrorPolicy();
+ void requireThatContentPolicyIsRandomWithoutState();
+ void requireThatContentPolicyIsTargetedWithState();
+ void requireThatContentPolicyCombinesSystemAndSlobrokState();
};
TEST_APPHOOK(Test);
@@ -128,10 +128,10 @@ Test::Main() {
requireThatExternPolicySelectsFromExternSlobrok(); TEST_FLUSH();
requireThatExternPolicyMergesOneReplyAsProtocol(); TEST_FLUSH();
- requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH();
- requireThatStoragePolicyIsRandomWithoutState(); TEST_FLUSH();
- requireThatStoragePolicyIsTargetedWithState(); TEST_FLUSH();
- requireThatStoragePolicyCombinesSystemAndSlobrokState(); TEST_FLUSH();
+ requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH();
+ requireThatContentPolicyIsRandomWithoutState(); TEST_FLUSH();
+ requireThatContentPolicyIsTargetedWithState(); TEST_FLUSH();
+ requireThatContentPolicyCombinesSystemAndSlobrokState(); TEST_FLUSH();
TEST_DONE();
}
@@ -782,15 +782,15 @@ void Test::testLoadBalancer() {
}
void
-Test::requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy()
+Test::requireThatContentPolicyWithIllegalParamIsAnErrorPolicy()
{
- EXPECT_TRUE(isErrorPolicy("Storage", ""));
- EXPECT_TRUE(isErrorPolicy("Storage", "config=foo;slobroks=foo"));
- EXPECT_TRUE(isErrorPolicy("Storage", "slobroks=foo"));
+ EXPECT_TRUE(isErrorPolicy("Content", ""));
+ EXPECT_TRUE(isErrorPolicy("Content", "config=foo;slobroks=foo"));
+ EXPECT_TRUE(isErrorPolicy("Content", "slobroks=foo"));
}
void
-Test::requireThatStoragePolicyIsRandomWithoutState()
+Test::requireThatContentPolicyIsRandomWithoutState()
{
TestFrame frame(_repo);
frame.setMessage(newPutDocumentMessage("id:ns:testdoc::"));
@@ -808,7 +808,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState()
string param = vespalib::make_string(
"cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit",
slobrok.port(), getDefaultDistributionConfig(2, 5).c_str());
- StoragePolicy &policy = setupStoragePolicy(
+ ContentPolicy &policy = setupContentPolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 5);
ASSERT_TRUE(policy.getSystemState() == nullptr);
@@ -826,15 +826,15 @@ Test::requireThatStoragePolicyIsRandomWithoutState()
}
}
-StoragePolicy &
-Test::setupStoragePolicy(TestFrame &frame, const string &param,
+ContentPolicy &
+Test::setupContentPolicy(TestFrame &frame, const string &param,
const string &pattern, int32_t numEntries)
{
- frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Storage:%s]", param.c_str())));
+ frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Content:%s]", param.c_str())));
mbus::MessageBus &mbus = frame.getMessageBus();
const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test");
const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0));
- StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME,
+ ContentPolicy &policy = static_cast<ContentPolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME,
dir.getName(), dir.getParam()));
policy.initSynchronous();
assertMirrorReady(*policy.getMirror());
@@ -845,7 +845,7 @@ Test::setupStoragePolicy(TestFrame &frame, const string &param,
}
void
-Test::requireThatStoragePolicyIsTargetedWithState()
+Test::requireThatContentPolicyIsTargetedWithState()
{
TestFrame frame(_repo);
frame.setMessage(newPutDocumentMessage("id:ns:testdoc::"));
@@ -863,7 +863,7 @@ Test::requireThatStoragePolicyIsTargetedWithState()
string param = vespalib::make_string(
"cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit",
slobrok.port(), getDefaultDistributionConfig(2, 5).c_str());
- StoragePolicy &policy = setupStoragePolicy(
+ ContentPolicy &policy = setupContentPolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 5);
ASSERT_TRUE(policy.getSystemState() == nullptr);
@@ -888,7 +888,7 @@ Test::requireThatStoragePolicyIsTargetedWithState()
}
void
-Test::requireThatStoragePolicyCombinesSystemAndSlobrokState()
+Test::requireThatContentPolicyCombinesSystemAndSlobrokState()
{
TestFrame frame(_repo);
frame.setMessage(newPutDocumentMessage("id:ns:testdoc::"));
@@ -902,7 +902,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState()
string param = vespalib::make_string(
"cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit",
slobrok.port(), getDefaultDistributionConfig(2, 5).c_str());
- StoragePolicy &policy = setupStoragePolicy(
+ ContentPolicy &policy = setupContentPolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 1);
ASSERT_TRUE(policy.getSystemState() == nullptr);
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
index 560f2f28f0e..0f86eb38aca 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
@@ -32,14 +32,14 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo,
// When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now.
putRoutingPolicyFactory("AND", std::make_shared<RoutingPolicyFactories::AndPolicyFactory>());
putRoutingPolicyFactory("Content", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>());
- putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>());
+ putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); // TODO Vespa 8: remove
putRoutingPolicyFactory("DocumentRouteSelector", std::make_shared<RoutingPolicyFactories::DocumentRouteSelectorPolicyFactory>(*_repo, cfg));
putRoutingPolicyFactory("Extern", std::make_shared<RoutingPolicyFactories::ExternPolicyFactory>());
+ putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>());
putRoutingPolicyFactory("LocalService", std::make_shared<RoutingPolicyFactories::LocalServicePolicyFactory>());
+ putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>());
putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>());
- putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::StoragePolicyFactory>());
putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>());
- putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>());
// Prepare version specifications to use when adding routable factories.
vespalib::VersionSpecification version6(6, 221);
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
index 26d51e702e9..83e1df02a24 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
@@ -3,7 +3,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT
SOURCES
andpolicy.cpp
externslobrokpolicy.cpp
- storagepolicy.cpp
contentpolicy.cpp
messagetypepolicy.cpp
documentrouteselectorpolicy.cpp
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
index aea393a60af..7150794653f 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
@@ -1,12 +1,79 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "contentpolicy.h"
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/error.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/config-stor-distribution.h>
+#include <vespa/config/subscription/configuri.h>
+#include <cassert>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".contentpolicy");
+
+using vespalib::make_string;
namespace documentapi {
ContentPolicy::ContentPolicy(const string& param)
- : StoragePolicy(param)
-{ }
+ : ExternSlobrokPolicy(parse(param)),
+ _bucketIdFactory()
+{
+ std::map<string, string> params(parse(param));
+
+ if (params.find("cluster") != params.end()) {
+ _clusterName = params.find("cluster")->second;
+ } else {
+ _error = "Required parameter clustername not set";
+ }
+
+ if (params.find("clusterconfigid") != params.end()) {
+ _clusterConfigId = params.find("clusterconfigid")->second;
+ }
+}
+
+namespace {
+ class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig>
+ {
+ public:
+ CallBack(ContentPolicy & policy) : _policy(policy) { }
+ void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override {
+ _policy.configure(std::move(config));
+ }
+ private:
+ ContentPolicy & _policy;
+ };
+}
+string ContentPolicy::init()
+{
+ string error = ExternSlobrokPolicy::init();
+ if (!error.empty()) {
+ return error;
+ }
+
+ if (_clusterConfigId.empty()) {
+ _clusterConfigId = createConfigId(_clusterName);
+ }
+
+ using storage::lib::Distribution;
+ config::ConfigUri uri(_clusterConfigId);
+ if (!_configSources.empty()) {
+ _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources)));
+ } else {
+ _configFetcher.reset(new config::ConfigFetcher(uri.getContext()));
+ }
+ _callBack = std::make_unique<CallBack>(*this);
+ _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get()));
+ _configFetcher->start();
+ return "";
+}
+
+ContentPolicy::~ContentPolicy() = default;
string
ContentPolicy::createConfigId(const string & clusterName) const
@@ -14,4 +81,177 @@ ContentPolicy::createConfigId(const string & clusterName) const
return clusterName;
}
+string
+ContentPolicy::createPattern(const string & clusterName, int distributor) const
+{
+ vespalib::asciistream ost;
+
+ ost << "storage/cluster." << clusterName << "/distributor/";
+
+ if (distributor == -1) {
+ ost << '*';
+ } else {
+ ost << distributor;
+ }
+ ost << "/default";
+ return ost.str();
+}
+
+void
+ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
+{
+ try {
+ _nextDistribution = std::make_unique<storage::lib::Distribution>(*config);
+ } catch (const std::exception& e) {
+ LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str());
+ throw e;
+ }
+}
+
+void
+ContentPolicy::doSelect(mbus::RoutingContext &context)
+{
+ const mbus::Message &msg = context.getMessage();
+
+ int distributor = -1;
+
+ if (_state.get()) {
+ document::BucketId id;
+ switch(msg.getType()) {
+ case DocumentProtocol::MESSAGE_PUTDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId());
+ break;
+
+ case DocumentProtocol::MESSAGE_GETDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId());
+ break;
+
+ case DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId());
+ break;
+
+ case DocumentProtocol::MESSAGE_UPDATEDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId());
+ break;
+
+ case DocumentProtocol::MESSAGE_STATBUCKET:
+ id = static_cast<const StatBucketMessage&>(msg).getBucketId();
+ break;
+
+ case DocumentProtocol::MESSAGE_GETBUCKETLIST:
+ id = static_cast<const GetBucketListMessage&>(msg).getBucketId();
+ break;
+
+ case DocumentProtocol::MESSAGE_CREATEVISITOR:
+ id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0];
+ break;
+
+ case DocumentProtocol::MESSAGE_REMOVELOCATION:
+ id = static_cast<const RemoveLocationMessage&>(msg).getBucketId();
+ break;
+
+ default:
+ LOG(error, "Message type '%d' not supported.", msg.getType());
+ return;
+ }
+
+ // _P_A_R_A_N_O_I_A_
+ if (id.getRawId() == 0) {
+ mbus::Reply::UP reply(new mbus::EmptyReply());
+ reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR,
+ "No bucket id available in message."));
+ context.setReply(std::move(reply));
+ return;
+ }
+
+ // Pick a distributor using ideal state algorithm
+ try {
+ // Update distribution here, to make it not take lock in average case
+ if (_nextDistribution) {
+ _distribution = std::move(_nextDistribution);
+ _nextDistribution.reset();
+ }
+ assert(_distribution.get());
+ distributor = _distribution->getIdealDistributorNode(*_state, id);
+ } catch (storage::lib::TooFewBucketBitsInUseException& e) {
+ auto reply = std::make_unique<WrongDistributionReply>(_state->toString());
+ reply->addError(mbus::Error(
+ DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
+ "Too few distribution bits used for given cluster state"));
+ context.setReply(std::move(reply));
+ return;
+ } catch (storage::lib::NoDistributorsAvailableException& e) {
+ // No distributors available in current cluster state. Remove
+ // cluster state we cannot use and send to random target
+ _state.reset();
+ distributor = -1;
+ }
+ }
+
+ mbus::Hop hop = getRecipient(context, distributor);
+
+ if (distributor != -1 && !hop.hasDirectives()) {
+ hop = getRecipient(context, -1);
+ }
+
+ if (hop.hasDirectives()) {
+ mbus::Route route = context.getRoute();
+ route.setHop(0, hop);
+ context.addChild(route);
+ } else {
+ context.setError(
+ mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE,
+ make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str()));
+ }
+}
+
+mbus::Hop
+ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor)
+{
+ slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor));
+
+ if (!entries.empty()) {
+ return mbus::Hop::parse(entries[random() % entries.size()].second + "/default");
+ }
+
+ return mbus::Hop();
+}
+
+void
+ContentPolicy::merge(mbus::RoutingContext &context)
+{
+ mbus::RoutingNodeIterator it = context.getChildIterator();
+ mbus::Reply::UP reply = it.removeReply();
+
+ if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) {
+ updateStateFromReply(static_cast<WrongDistributionReply&>(*reply));
+ } else if (reply->hasErrors()) {
+ _state.reset();
+ }
+
+ context.setReply(std::move(reply));
+}
+
+void
+ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr)
+{
+ std::unique_ptr<storage::lib::ClusterState> newState(
+ new storage::lib::ClusterState(wdr.getSystemState()));
+ if (!_state || newState->getVersion() >= _state->getVersion()) {
+ if (_state) {
+ wdr.getTrace().trace(1, make_string("System state changed from version %u to %u",
+ _state->getVersion(), newState->getVersion()));
+ } else {
+ wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion()));
+ }
+
+ _state = std::move(newState);
+ } else {
+ wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, "
+ "while old state had version %d. New states should not have a lower version than the old.",
+ newState->getVersion(), _state->getVersion()));
+ _state.reset();
+ }
+}
+
} // documentapi
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
index 4b2f356c740..e29fbb75524 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
@@ -1,17 +1,62 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "storagepolicy.h"
+#include "externslobrokpolicy.h"
+#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/messagebus/routing/hop.h>
+#include <vespa/config/helper/ifetchercallback.h>
+#include <vespa/config/helper/configfetcher.h>
+
+namespace config {
+ class ICallback;
+ class ConfigFetcher;
+}
+
+namespace storage {
+namespace lib {
+ class Distribution;
+ class ClusterState;
+}
+}
namespace documentapi {
-class ContentPolicy : public StoragePolicy
+class ContentPolicy : public ExternSlobrokPolicy
{
+private:
+ document::BucketIdFactory _bucketIdFactory;
+ std::unique_ptr<storage::lib::ClusterState> _state;
+ string _clusterName;
+ string _clusterConfigId;
+ std::unique_ptr<config::ICallback> _callBack;
+ std::unique_ptr<config::ConfigFetcher> _configFetcher;
+ std::unique_ptr<storage::lib::Distribution> _distribution;
+ std::unique_ptr<storage::lib::Distribution> _nextDistribution;
+
+ mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor);
+
public:
ContentPolicy(const string& param);
+ ~ContentPolicy();
+ void doSelect(mbus::RoutingContext &context) override;
+ void merge(mbus::RoutingContext &context) override;
+
+ void updateStateFromReply(WrongDistributionReply& reply);
+
+ /**
+ * @return a pointer to the system state registered with this policy. If
+ * we haven't received a system state yet, returns NULL.
+ */
+ const storage::lib::ClusterState* getSystemState() const { return _state.get(); }
+
+ virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config);
+ string init() override;
+
private:
- string createConfigId(const string & clusterName) const override;
+ string createConfigId(const string & clusterName) const;
+ string createPattern(const string & clusterName, int distributor) const;
};
}
-
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp
deleted file mode 100644
index 3fc1df0352a..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "storagepolicy.h"
-#include <vespa/document/base/documentid.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/error.h>
-#include <vespa/documentapi/documentapi.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/config-stor-distribution.h>
-#include <vespa/config/subscription/configuri.h>
-#include <cassert>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".storagepolicy");
-
-using vespalib::make_string;
-
-namespace documentapi {
-
-StoragePolicy::StoragePolicy(const string& param)
- : ExternSlobrokPolicy(parse(param)),
- _bucketIdFactory()
-{
- std::map<string, string> params(parse(param));
-
- if (params.find("cluster") != params.end()) {
- _clusterName = params.find("cluster")->second;
- } else {
- _error = "Required parameter clustername not set";
- }
-
- if (params.find("clusterconfigid") != params.end()) {
- _clusterConfigId = params.find("clusterconfigid")->second;
- }
-}
-
-namespace {
- class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig>
- {
- public:
- CallBack(StoragePolicy & policy) : _policy(policy) { }
- void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override {
- _policy.configure(std::move(config));
- }
- private:
- StoragePolicy & _policy;
- };
-}
-string StoragePolicy::init()
-{
- string error = ExternSlobrokPolicy::init();
- if (!error.empty()) {
- return error;
- }
-
- if (_clusterConfigId.empty()) {
- _clusterConfigId = createConfigId(_clusterName);
- }
-
- using storage::lib::Distribution;
- config::ConfigUri uri(_clusterConfigId);
- if (!_configSources.empty()) {
- _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources)));
- } else {
- _configFetcher.reset(new config::ConfigFetcher(uri.getContext()));
- }
- _callBack = std::make_unique<CallBack>(*this);
- _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get()));
- _configFetcher->start();
- return "";
-}
-
-StoragePolicy::~StoragePolicy() = default;
-
-string
-StoragePolicy::createConfigId(const string & clusterName) const
-{
- return "storage/cluster." + clusterName;
-}
-
-string
-StoragePolicy::createPattern(const string & clusterName, int distributor) const
-{
- vespalib::asciistream ost;
-
- ost << "storage/cluster." << clusterName << "/distributor/";
-
- if (distributor == -1) {
- ost << '*';
- } else {
- ost << distributor;
- }
- ost << "/default";
- return ost.str();
-}
-
-void
-StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
-{
- try {
- _nextDistribution = std::make_unique<storage::lib::Distribution>(*config);
- } catch (const std::exception& e) {
- LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str());
- throw e;
- }
-}
-
-void
-StoragePolicy::doSelect(mbus::RoutingContext &context)
-{
- const mbus::Message &msg = context.getMessage();
-
- int distributor = -1;
-
- if (_state.get()) {
- document::BucketId id;
- switch(msg.getType()) {
- case DocumentProtocol::MESSAGE_PUTDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId());
- break;
-
- case DocumentProtocol::MESSAGE_GETDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId());
- break;
-
- case DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId());
- break;
-
- case DocumentProtocol::MESSAGE_UPDATEDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId());
- break;
-
- case DocumentProtocol::MESSAGE_STATBUCKET:
- id = static_cast<const StatBucketMessage&>(msg).getBucketId();
- break;
-
- case DocumentProtocol::MESSAGE_GETBUCKETLIST:
- id = static_cast<const GetBucketListMessage&>(msg).getBucketId();
- break;
-
- case DocumentProtocol::MESSAGE_CREATEVISITOR:
- id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0];
- break;
-
- case DocumentProtocol::MESSAGE_REMOVELOCATION:
- id = static_cast<const RemoveLocationMessage&>(msg).getBucketId();
- break;
-
- default:
- LOG(error, "Message type '%d' not supported.", msg.getType());
- return;
- }
-
- // _P_A_R_A_N_O_I_A_
- if (id.getRawId() == 0) {
- mbus::Reply::UP reply(new mbus::EmptyReply());
- reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR,
- "No bucket id available in message."));
- context.setReply(std::move(reply));
- return;
- }
-
- // Pick a distributor using ideal state algorithm
- try {
- // Update distribution here, to make it not take lock in average case
- if (_nextDistribution) {
- _distribution = std::move(_nextDistribution);
- _nextDistribution.reset();
- }
- assert(_distribution.get());
- distributor = _distribution->getIdealDistributorNode(*_state, id);
- } catch (storage::lib::TooFewBucketBitsInUseException& e) {
- auto reply = std::make_unique<WrongDistributionReply>(_state->toString());
- reply->addError(mbus::Error(
- DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
- "Too few distribution bits used for given cluster state"));
- context.setReply(std::move(reply));
- return;
- } catch (storage::lib::NoDistributorsAvailableException& e) {
- // No distributors available in current cluster state. Remove
- // cluster state we cannot use and send to random target
- _state.reset();
- distributor = -1;
- }
- }
-
- mbus::Hop hop = getRecipient(context, distributor);
-
- if (distributor != -1 && !hop.hasDirectives()) {
- hop = getRecipient(context, -1);
- }
-
- if (hop.hasDirectives()) {
- mbus::Route route = context.getRoute();
- route.setHop(0, hop);
- context.addChild(route);
- } else {
- context.setError(
- mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE,
- make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str()));
- }
-}
-
-mbus::Hop
-StoragePolicy::getRecipient(mbus::RoutingContext& context, int distributor)
-{
- slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor));
-
- if (!entries.empty()) {
- return mbus::Hop::parse(entries[random() % entries.size()].second + "/default");
- }
-
- return mbus::Hop();
-}
-
-void
-StoragePolicy::merge(mbus::RoutingContext &context)
-{
- mbus::RoutingNodeIterator it = context.getChildIterator();
- mbus::Reply::UP reply = it.removeReply();
-
- if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) {
- updateStateFromReply(static_cast<WrongDistributionReply&>(*reply));
- } else if (reply->hasErrors()) {
- _state.reset();
- }
-
- context.setReply(std::move(reply));
-}
-
-void
-StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr)
-{
- std::unique_ptr<storage::lib::ClusterState> newState(
- new storage::lib::ClusterState(wdr.getSystemState()));
- if (!_state || newState->getVersion() >= _state->getVersion()) {
- if (_state) {
- wdr.getTrace().trace(1, make_string("System state changed from version %u to %u",
- _state->getVersion(), newState->getVersion()));
- } else {
- wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion()));
- }
-
- _state = std::move(newState);
- } else {
- wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, "
- "while old state had version %d. New states should not have a lower version than the old.",
- newState->getVersion(), _state->getVersion()));
- _state.reset();
- }
-}
-
-} // documentapi
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h
deleted file mode 100644
index 5cd2efcbbd3..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "externslobrokpolicy.h"
-#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h>
-#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/document/bucket/bucketidfactory.h>
-#include <vespa/messagebus/routing/hop.h>
-#include <vespa/config/helper/ifetchercallback.h>
-#include <vespa/config/helper/configfetcher.h>
-
-namespace config {
- class ICallback;
- class ConfigFetcher;
-}
-
-namespace storage {
-namespace lib {
- class Distribution;
- class ClusterState;
-}
-}
-
-namespace documentapi {
-
-class StoragePolicy : public ExternSlobrokPolicy
-{
-private:
- document::BucketIdFactory _bucketIdFactory;
- std::unique_ptr<storage::lib::ClusterState> _state;
- string _clusterName;
- string _clusterConfigId;
- std::unique_ptr<config::ICallback> _callBack;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
- std::unique_ptr<storage::lib::Distribution> _distribution;
- std::unique_ptr<storage::lib::Distribution> _nextDistribution;
-
- mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor);
-
-public:
- StoragePolicy(const string& param);
- ~StoragePolicy();
- void doSelect(mbus::RoutingContext &context) override;
- void merge(mbus::RoutingContext &context) override;
-
- void updateStateFromReply(WrongDistributionReply& reply);
-
- /**
- * @return a pointer to the system state registered with this policy. If
- * we haven't received a system state yet, returns NULL.
- */
- const storage::lib::ClusterState* getSystemState() const { return _state.get(); }
-
- virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config);
- string init() override;
-
-private:
- virtual string createConfigId(const string & clusterName) const;
- string createPattern(const string & clusterName, int distributor) const;
-};
-
-}
-
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
index 2c244c63046..f945fe8cd02 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
@@ -1,16 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "routingpolicyfactories.h"
#include <vespa/documentapi/messagebus/policies/andpolicy.h>
+#include <vespa/documentapi/messagebus/policies/contentpolicy.h>
#include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h>
#include <vespa/documentapi/messagebus/policies/errorpolicy.h>
#include <vespa/documentapi/messagebus/policies/externpolicy.h>
+#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h>
#include <vespa/documentapi/messagebus/policies/localservicepolicy.h>
+#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h>
#include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h>
#include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h>
-#include <vespa/documentapi/messagebus/policies/storagepolicy.h>
-#include <vespa/documentapi/messagebus/policies/contentpolicy.h>
-#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h>
-#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h>
using namespace documentapi;
@@ -21,17 +20,6 @@ RoutingPolicyFactories::AndPolicyFactory::createPolicy(const string &param) cons
}
mbus::IRoutingPolicy::UP
-RoutingPolicyFactories::StoragePolicyFactory::createPolicy(const string &param) const
-{
- mbus::IRoutingPolicy::UP ret(new StoragePolicy(param));
- string error = static_cast<StoragePolicy&>(*ret).getError();
- if (!error.empty()) {
- ret.reset(new ErrorPolicy(error));
- }
- return ret;
-}
-
-mbus::IRoutingPolicy::UP
RoutingPolicyFactories::MessageTypePolicyFactory::createPolicy(const string &param) const
{
return mbus::IRoutingPolicy::UP(new MessageTypePolicy(param));
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
index e2bf5119c58..533ad93e644 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
@@ -16,10 +16,6 @@ public:
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
};
- class StoragePolicyFactory : public IRoutingPolicyFactory {
- public:
- mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
- };
class MessageTypePolicyFactory : public IRoutingPolicyFactory {
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;