aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-02-15 14:10:23 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-02-15 14:10:23 +0000
commit1b840c9e91e60066d01a36b932bc321828035c94 (patch)
tree0628de4cd8f1468182443168156067d6a1f21959
parent2f53ef81724440d051749d486bdeade65d85ef1c (diff)
More agile retry policy.
-rw-r--r--messagebus/src/tests/resender/resender.cpp22
-rw-r--r--messagebus/src/tests/retrypolicy/retrypolicy.cpp15
-rw-r--r--messagebus/src/tests/routing/routing.cpp36
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp8
5 files changed, 28 insertions, 55 deletions
diff --git a/messagebus/src/tests/resender/resender.cpp b/messagebus/src/tests/resender/resender.cpp
index fa740e929fa..cd7fdbeb6cc 100644
--- a/messagebus/src/tests/resender/resender.cpp
+++ b/messagebus/src/tests/resender/resender.cpp
@@ -268,12 +268,11 @@ Test::testRetryDelay(TestData &data)
EXPECT_TRUE(msg.get() == NULL);
string trace = reply->getTrace().toString();
- printf("%s", trace.c_str());
- EXPECT_TRUE(trace.find("retry 1 in 0.01") != string::npos);
- EXPECT_TRUE(trace.find("retry 2 in 0.02") != string::npos);
- EXPECT_TRUE(trace.find("retry 3 in 0.03") != string::npos);
- EXPECT_TRUE(trace.find("retry 4 in 0.04") != string::npos);
- EXPECT_TRUE(trace.find("retry 5 in 0.05") != string::npos);
+ EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos);
+ EXPECT_TRUE(trace.find("retry 2 in 0.020") != string::npos);
+ EXPECT_TRUE(trace.find("retry 3 in 0.040") != string::npos);
+ EXPECT_TRUE(trace.find("retry 4 in 0.080") != string::npos);
+ EXPECT_TRUE(trace.find("retry 5 in 0.160") != string::npos);
}
void
@@ -298,11 +297,10 @@ Test::testRequestRetryDelay(TestData &data)
EXPECT_TRUE(msg.get() == NULL);
string trace = reply->getTrace().toString();
- printf("%s", trace.c_str());
- EXPECT_TRUE(trace.find("retry 1 in 0") != string::npos);
- EXPECT_TRUE(trace.find("retry 2 in 0.02") != string::npos);
- EXPECT_TRUE(trace.find("retry 3 in 0.04") != string::npos);
- EXPECT_TRUE(trace.find("retry 4 in 0.06") != string::npos);
- EXPECT_TRUE(trace.find("retry 5 in 0.08") != string::npos);
+ EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos);
+ EXPECT_TRUE(trace.find("retry 2 in 0.020") != string::npos);
+ EXPECT_TRUE(trace.find("retry 3 in 0.040") != string::npos);
+ EXPECT_TRUE(trace.find("retry 4 in 0.060") != string::npos);
+ EXPECT_TRUE(trace.find("retry 5 in 0.080") != string::npos);
}
diff --git a/messagebus/src/tests/retrypolicy/retrypolicy.cpp b/messagebus/src/tests/retrypolicy/retrypolicy.cpp
index 1ca452c4cf1..b03357b9498 100644
--- a/messagebus/src/tests/retrypolicy/retrypolicy.cpp
+++ b/messagebus/src/tests/retrypolicy/retrypolicy.cpp
@@ -12,14 +12,17 @@ int
Test::Main()
{
TEST_INIT("retrypolicy_test");
-
+ constexpr double DELAY(0.001);
RetryTransientErrorsPolicy policy;
+ policy.setBaseDelay(DELAY);
+ EXPECT_EQUAL(0.0, policy.getRetryDelay(0));
+ EXPECT_EQUAL(0.0, policy.getRetryDelay(1));
+ for (uint32_t j = 2; j < 15; ++j) {
+ EXPECT_EQUAL(DELAY*(1 << (j-1)), policy.getRetryDelay(j));
+ }
+ EXPECT_EQUAL(10.0, policy.getRetryDelay(15));
+ EXPECT_EQUAL(10.0, policy.getRetryDelay(20));
for (uint32_t i = 0; i < 5; ++i) {
- double delay = i / 3.0;
- policy.setBaseDelay(delay);
- for (uint32_t j = 0; j < 5; ++j) {
- EXPECT_EQUAL((int)(j * delay), (int)policy.getRetryDelay(j));
- }
for (uint32_t j = ErrorCode::NONE; j < ErrorCode::ERROR_LIMIT; ++j) {
policy.setEnabled(true);
if (j < ErrorCode::FATAL_ERROR) {
diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp
index cb9ad2eef47..42c5938fe92 100644
--- a/messagebus/src/tests/routing/routing.cpp
+++ b/messagebus/src/tests/routing/routing.cpp
@@ -635,7 +635,6 @@ Test::testTrace(TestData &data, const std::vector<string> &expected)
if (!EXPECT_TRUE(reply.get() != NULL)) {
return false;
}
- printf("%s", reply->getTrace().toString().c_str());
if (!EXPECT_TRUE(!reply->hasErrors())) {
return false;
}
@@ -747,7 +746,6 @@ Test::testNoRoutingTable(TestData &data)
Result res = data._srcSession->send(createMessage("msg"), "foo");
EXPECT_TRUE(!res.isAccepted());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode());
- printf("%s\n", res.getError().getMessage().c_str());
Message::UP msg = res.getMessage();
EXPECT_TRUE(msg.get() != NULL);
}
@@ -760,7 +758,6 @@ Test::testUnknownRoute(TestData &data)
Result res = data._srcSession->send(createMessage("msg"), "baz");
EXPECT_TRUE(!res.isAccepted());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode());
- printf("%s\n", res.getError().getMessage().c_str());
Message::UP msg = res.getMessage();
EXPECT_TRUE(msg.get() != NULL);
}
@@ -771,7 +768,6 @@ Test::testNoRoute(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route()).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
}
@@ -787,7 +783,6 @@ Test::testRecognizeHopName(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
}
@@ -803,7 +798,6 @@ Test::testRecognizeRouteDirective(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
}
@@ -818,7 +812,6 @@ Test::testRecognizeRouteName(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
}
@@ -831,7 +824,6 @@ Test::testHopResolutionOverflow(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
}
@@ -844,7 +836,6 @@ Test::testRouteResolutionOverflow(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), "foo").isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
}
@@ -863,7 +854,6 @@ Test::testInsertRoute(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
}
@@ -875,7 +865,6 @@ Test::testErrorDirective(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode());
EXPECT_EQUAL("err", reply->getError(0).getMessage());
@@ -907,7 +896,6 @@ Test::testSelectNone(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_SERVICES_FOR_ROUTE, reply->getError(0).getCode());
}
@@ -925,7 +913,6 @@ Test::testSelectOne(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
}
@@ -951,7 +938,6 @@ Test::testResend1(TestData &data)
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("[APP_TRANSIENT_ERROR @ localhost]: err1")
@@ -987,7 +973,6 @@ Test::testResend2(TestData &data)
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Source session accepted a 3 byte message. 1 message(s) now pending.")
@@ -1002,7 +987,7 @@ Test::testResend2(TestData &data)
.add("Reply (type 0) received at client.")
.add("Routing policy 'Custom' merging replies.")
.add("Merged { 'dst/session' }.")
- .add("Message scheduled for retry 1 in 0.00 seconds.")
+ .add("Message scheduled for retry 1 in 0.000 seconds.")
.add("Resender resending message.")
.add("Running routing policy 'Custom'.")
.add("Selecting { 'dst/session' }.")
@@ -1015,7 +1000,7 @@ Test::testResend2(TestData &data)
.add("Reply (type 0) received at client.")
.add("Routing policy 'Custom' merging replies.")
.add("Merged { 'dst/session' }.")
- .add("Message scheduled for retry 2 in 0.00 seconds.")
+ .add("Message scheduled for retry 2 in 0.000 seconds.")
.add("Resender resending message.")
.add("Running routing policy 'Custom'.")
.add("Selecting { 'dst/session' }.")
@@ -1044,7 +1029,6 @@ Test::testNoResend(TestData &data)
data._dstSession->reply(std::move(reply));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_TRANSIENT_ERROR, reply->getError(0).getCode());
}
@@ -1069,7 +1053,6 @@ Test::testSelectOnResend(TestData &data)
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Selecting { 'dst/session' }.")
@@ -1102,7 +1085,6 @@ Test::testNoSelectOnResend(TestData &data)
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Selecting { 'dst/session' }.")
@@ -1129,7 +1111,6 @@ Test::testCanConsumeError(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode());
EXPECT_TRUE(testTrace(StringList()
@@ -1175,7 +1156,6 @@ Test::testNestedPolicies(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode());
}
@@ -1197,7 +1177,6 @@ Test::testRemoveReply(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("[NO_ADDRESS_FOR_SERVICE @ localhost]")
@@ -1222,7 +1201,6 @@ Test::testSetReply(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode());
EXPECT_EQUAL("foo", reply->getError(0).getMessage());
@@ -1253,7 +1231,6 @@ Test::testResendSetAndReuseReply(TestData &data)
data._dstSession->acknowledge(std::move(msg));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
}
@@ -1277,7 +1254,6 @@ Test::testResendSetAndRemoveReply(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode());
EXPECT_EQUAL("foo", reply->getError(0).getMessage());
@@ -1302,7 +1278,6 @@ Test::testHopIgnoresReply(TestData &data)
data._dstSession->reply(std::move(reply));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Not waiting for a reply from 'dst/session'."),
@@ -1323,7 +1298,6 @@ Test::testHopBlueprintIgnoresReply(TestData &data)
data._dstSession->reply(std::move(reply));
reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_TRUE(!reply->hasErrors());
EXPECT_TRUE(testTrace(StringList()
.add("Not waiting for a reply from 'dst/session'."),
@@ -1341,7 +1315,6 @@ Test::testAcceptEmptyRoute(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
}
void
@@ -1361,7 +1334,6 @@ Test::testAbortOnlyActiveNodes(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[SetReply:foo],?bar,dst/session]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(2u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode());
EXPECT_EQUAL((uint32_t)ErrorCode::SEND_ABORTED, reply->getError(1).getCode());
@@ -1373,7 +1345,6 @@ Test::testUnknownPolicy(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Unknown]")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::UNKNOWN_POLICY, reply->getError(0).getCode());
}
@@ -1392,7 +1363,6 @@ Test::testSelectException(TestData &data)
.isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR,
reply->getError(0).getCode());
@@ -1417,7 +1387,6 @@ Test::testMergeException(TestData &data)
data._dstSession->acknowledge(std::move(msg));
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR,
reply->getError(0).getCode());
@@ -1568,7 +1537,6 @@ Test::testTimeout(TestData &data)
EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/unknown")).isAccepted());
Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT);
ASSERT_TRUE(reply.get() != NULL);
- printf("%s", reply->getTrace().toString().c_str());
EXPECT_EQUAL(2u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode());
EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(1).getCode());
diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp
index 048d81b697e..0ba6e0827b2 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.cpp
+++ b/messagebus/src/vespa/messagebus/routing/resender.cpp
@@ -85,7 +85,7 @@ Resender::scheduleRetry(RoutingNode &node)
node.prepareForRetry(); // consumes the reply
node.getTrace().trace(
TraceLevel::COMPONENT,
- vespalib::make_string("Message scheduled for retry %u in %.2f seconds.", retry, delay));
+ vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay));
msg.setRetry(retry);
_queue.push(Entry((uint64_t)(_time.MilliSecsToNow() + delay * 1000), &node));
return true;
diff --git a/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp b/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp
index 7a2147b13ff..5aa11104b09 100644
--- a/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp
+++ b/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp
@@ -7,7 +7,7 @@ namespace mbus {
RetryTransientErrorsPolicy::RetryTransientErrorsPolicy() :
_enabled(true),
- _baseDelay(1.0)
+ _baseDelay(0.001)
{}
RetryTransientErrorsPolicy &
@@ -29,7 +29,11 @@ RetryTransientErrorsPolicy::canRetry(uint32_t errorCode) const {
double
RetryTransientErrorsPolicy::getRetryDelay(uint32_t retry) const {
- return _baseDelay.load(std::memory_order_relaxed) * retry;
+ uint64_t retryMultiplier = 0l;
+ if (retry > 1) {
+ retryMultiplier = 1L << std::min(20u, retry-1);
+ }
+ return std::min(10.0, _baseDelay.load(std::memory_order_relaxed) * retryMultiplier);
}
}