summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-12-20 14:38:29 +0100
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-12-20 15:07:33 +0100
commite1ba0a6cadfa68430daa44430c3e315ceb0c2598 (patch)
tree1ab868416deccd84a83eec7c24820421b828c878 /documentapi
parenta928a12f477ac3e976cd04e1512c15aff14e3138 (diff)
Don't trigger implicit ContentPolicy random send fallback on expected transient errors
The `ContentPolicy` has a failure handling policy where more than _n_ error replies (a small number in practice) will trigger an implicit random send instead of using the cached cluster state. This is to force rediscovery of the actual cluster state, and is useful if a node is bad but we're not sending feed to enough other nodes to figure it out from them. However, certain error codes may be used frequently by the content layer for purposes that do _not_ indicate that a change in cluster state may have happened, and should therefore not be counted as errors that may indicate a bad node: * `ERROR_TEST_AND_SET_CONDITION_FAILED`: may happen for any mutating operation that has an associated TaS condition. Technically an `APP_FATAL_ERROR` since resending doesn't make sense. * `ERROR_BUSY`: may happen for concurrent mutations and if distributors are in the process of changing bucket ownership and the grace period hasn't passed yet. Also sent if queues are full and client policy should back off a bit. None of these are errors as per se.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java96
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java44
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java28
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java2
4 files changed, 144 insertions, 26 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
index 5eaec70ca59..21e621883fe 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
@@ -232,6 +232,47 @@ public class ContentPolicy extends SlobrokPolicy {
}
+ /**
+ * Tracks "instability" across nodes based on number of failures received versus some
+ * implementation-specific limit.
+ *
+ * Implementations must be thread-safe.
+ *
+ * TODO should ideally be protected, but there's a package mismatch between policy classes and its tests
+ */
+ public interface InstabilityChecker {
+ boolean tooManyFailures(int nodeIndex);
+ void addFailure(Integer calculatedDistributor);
+ }
+
+ /** Class that tracks a failure of a given type per node. */
+ public static class PerNodeCountingInstabilityChecker implements InstabilityChecker {
+ private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
+ private final int failureLimit;
+
+ public PerNodeCountingInstabilityChecker(int failureLimit) {
+ this.failureLimit = failureLimit;
+ }
+
+ @Override
+ public boolean tooManyFailures(int nodeIndex) {
+ if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
+ nodeFailures.set(nodeIndex, 0);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void addFailure(Integer calculatedDistributor) {
+ while (nodeFailures.size() <= calculatedDistributor) {
+ nodeFailures.add(0);
+ }
+ nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
+ }
+ }
+
/** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
public static class Parameters {
@@ -271,6 +312,9 @@ public class ContentPolicy extends SlobrokPolicy {
return distributionConfig == null ? new Distribution(getDistributionConfigId())
: new Distribution(distributionConfig.cluster(clusterName));
}
+ public InstabilityChecker createInstabilityChecker() {
+ return new PerNodeCountingInstabilityChecker(getAttemptRandomOnFailuresLimit());
+ }
/**
* When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the
@@ -324,27 +368,6 @@ public class ContentPolicy extends SlobrokPolicy {
/** Class handling the logic of picking a distributor */
public static class DistributorSelectionLogic {
- /** Class that tracks a failure of a given type per node. */
- static class InstabilityChecker {
- private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
- private final int failureLimit;
-
- InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; }
-
- boolean tooManyFailures(int nodeIndex) {
- if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
- nodeFailures.set(nodeIndex, 0);
- return true;
- } else {
- return false;
- }
- }
-
- void addFailure(Integer calculatedDistributor) {
- while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0);
- nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
- }
- }
/** Message context class. Contains data we want to inspect about a request at reply time. */
private static class MessageContext {
final Integer calculatedDistributor;
@@ -375,7 +398,7 @@ public class ContentPolicy extends SlobrokPolicy {
try {
hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
distribution = params.createDistribution(policy);
- persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
+ persistentFailureChecker = params.createInstabilityChecker();
maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
} catch (Throwable e) {
destroy();
@@ -556,10 +579,37 @@ public class ContentPolicy extends SlobrokPolicy {
}
}
+ /**
+ * Returns whether a given error Reply should be counted towards potentially ignoring the cached
+ * cluster state and triggering a random send (and thus likely WrongDistributionReply with the
+ * current cluster state). Certain error codes may be used frequently by the content layer for
+ * purposes that do _not_ indicate that a change in cluster state may have happened, and should
+ * therefore not be counted for this purpose:
+ * - ERROR_TEST_AND_SET_CONDITION_FAILED: may happen for any mutating operation that has an
+ * associated TaS condition. Technically an APP_FATAL_ERROR since resending doesn't make sense.
+ * - ERROR_BUSY: may happen for concurrent mutations and if distributors are in the process of
+ * changing bucket ownership and the grace period hasn't passed yet.
+ */
+ private static boolean shouldCountAsErrorForRandomSendTrigger(Reply reply) {
+ if (reply.getNumErrors() != 1) {
+ return !reply.hasErrors(); // For simplicity, count any reply with > 1 error.
+ }
+ var error = reply.getError(0);
+ switch (error.getCode()) {
+ // TODO this feels like a layering violation, but we use DocumentProtocol directly in other places in this policy anyway...
+ case DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED:
+ case DocumentProtocol.ERROR_BUSY:
+ return false;
+ default: return true;
+ }
+ }
+
void handleErrorReply(Reply reply, Object untypedContext) {
MessageContext messageContext = (MessageContext) untypedContext;
if (messageContext.calculatedDistributor != null) {
- persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
+ if (shouldCountAsErrorForRandomSendTrigger(reply)) {
+ persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
+ }
if (reply.getTrace().shouldTrace(1)) {
reply.getTrace().trace(1, "Failed with " + messageContext.toString());
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java
index 42625cf193d..c160f67aa9b 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java
@@ -1,9 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol.test.storagepolicy;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.routing.RoutingNode;
import org.junit.Ignore;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
public class ContentPolicyTest extends Simulator {
/**
@@ -129,6 +133,46 @@ public class ContentPolicyTest extends Simulator {
// Note that we use extra requests here as with only 200 requests there was a pretty good chance of not going to any down node on random anyhow.
}
+ private void setUpSingleNodeFixturesWithInitializedPolicy() {
+ setClusterNodes(new int[]{ 0 });
+ // Seed policy with initial, correct cluster state.
+ {
+ RoutingNode target = select();
+ replyWrongDistribution(target, "foo", null, "version:1234 distributor:1 storage:1");
+ }
+ }
+
+ @Test
+ public void transient_errors_expected_during_normal_feed_are_not_counted_as_errors_that_may_trigger_random_send() {
+ setUpSingleNodeFixturesWithInitializedPolicy();
+ var checker = policyFactory.getLastParameters().instabilityChecker;
+ assertEquals(0, checker.recordedFailures); // WrongDistributionReply not counted as regular error
+ {
+ frame.setMessage(createMessage("id:ns:testdoc:n=2:foo"));
+ RoutingNode target = select();
+ replyError(target, new com.yahoo.messagebus.Error(DocumentProtocol.ERROR_BUSY, "Get busy livin' or get busy resendin'"));
+ }
+ assertEquals(0, checker.recordedFailures); // BUSY not counted as error
+ {
+ frame.setMessage(createMessage("id:ns:testdoc:n=3:foo"));
+ RoutingNode target = select();
+ replyError(target, new com.yahoo.messagebus.Error(DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED, "oh no"));
+ }
+ assertEquals(0, checker.recordedFailures); // TaS failures not counted as error
+ }
+
+ @Test
+ public void other_errors_during_feed_are_counted_as_errors_that_may_trigger_random_send() {
+ setUpSingleNodeFixturesWithInitializedPolicy();
+ var checker = policyFactory.getLastParameters().instabilityChecker;
+ {
+ frame.setMessage(createMessage("id:ns:testdoc:n=1:foo"));
+ RoutingNode target = select();
+ replyError(target, new com.yahoo.messagebus.Error(DocumentProtocol.ERROR_ABORTED, "shop's closing, go home"));
+ }
+ assertEquals(1, checker.recordedFailures);
+ }
+
// Left to test?
// Cluster state down - Not overwrite last good nodes to send random to?
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java
index f0c54505bb1..b6893d69325 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java
@@ -145,14 +145,37 @@ public abstract class ContentPolicyTestEnvironment {
}
}
+ public static class TestWrappingInstabilityChecker implements ContentPolicy.InstabilityChecker {
+
+ public int recordedFailures = 0;
+ private final ContentPolicy.InstabilityChecker fwdChecker;
+
+ TestWrappingInstabilityChecker(ContentPolicy.InstabilityChecker fwdChecker) {
+ this.fwdChecker = fwdChecker;
+ }
+
+ @Override
+ public boolean tooManyFailures(int nodeIndex) {
+ return fwdChecker.tooManyFailures(nodeIndex);
+ }
+
+ @Override
+ public void addFailure(Integer calculatedDistributor) {
+ ++recordedFailures;
+ fwdChecker.addFailure(calculatedDistributor);
+ }
+ }
+
public static class TestParameters extends ContentPolicy.Parameters {
private final TestHostFetcher hostFetcher;
private final Distribution distribution;
+ public final TestWrappingInstabilityChecker instabilityChecker;
public TestParameters(String parameters, Set<Integer> nodes) {
super(SlobrokPolicy.parse(parameters));
hostFetcher = new TestHostFetcher(getClusterName(), nodes);
distribution = new Distribution(Distribution.getDefaultDistributionConfig(2, 10));
+ instabilityChecker = new TestWrappingInstabilityChecker(new ContentPolicy.PerNodeCountingInstabilityChecker(5));
}
@Override
@@ -160,6 +183,9 @@ public abstract class ContentPolicyTestEnvironment {
@Override
public Distribution createDistribution(SlobrokPolicy policy) { return distribution; }
+
+ @Override
+ public ContentPolicy.InstabilityChecker createInstabilityChecker() { return instabilityChecker; }
}
public static class ContentPolicyTestFactory implements RoutingPolicyFactory {
@@ -182,8 +208,6 @@ public abstract class ContentPolicyTestEnvironment {
}
}
public TestParameters getLastParameters() { return parameterInstances.getLast(); }
- public void destroy() {
- }
}
private int findPreferredAvailableNodeForTestBucket() {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
index 651a1c15c0b..d40c87e0bd6 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java
@@ -188,7 +188,7 @@ public abstract class Simulator extends ContentPolicyTestEnvironment {
if (badNode != null && randomizer.nextDouble() < badNode.getFailureRate()) {
++failed[half];
switch (badNode.getFailureType()) {
- case TRANSIENT_ERROR: replyError(target, new com.yahoo.messagebus.Error(DocumentProtocol.ERROR_BUSY, "Transient error")); break;
+ case TRANSIENT_ERROR: replyError(target, new com.yahoo.messagebus.Error(DocumentProtocol.ERROR_ABORTED, "Transient error")); break;
case FATAL_ERROR: replyError(target, new com.yahoo.messagebus.Error(DocumentProtocol.ERROR_UNPARSEABLE, "Fatal error")); break;
case OLD_CLUSTER_STATE:
case RESET_CLUSTER_STATE: