diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-02-15 14:10:23 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-02-15 14:10:23 +0000 |
commit | 1b840c9e91e60066d01a36b932bc321828035c94 (patch) | |
tree | 0628de4cd8f1468182443168156067d6a1f21959 /messagebus | |
parent | 2f53ef81724440d051749d486bdeade65d85ef1c (diff) |
More agile retry policy.
Diffstat (limited to 'messagebus')
5 files changed, 28 insertions, 55 deletions
diff --git a/messagebus/src/tests/resender/resender.cpp b/messagebus/src/tests/resender/resender.cpp index fa740e929fa..cd7fdbeb6cc 100644 --- a/messagebus/src/tests/resender/resender.cpp +++ b/messagebus/src/tests/resender/resender.cpp @@ -268,12 +268,11 @@ Test::testRetryDelay(TestData &data) EXPECT_TRUE(msg.get() == NULL); string trace = reply->getTrace().toString(); - printf("%s", trace.c_str()); - EXPECT_TRUE(trace.find("retry 1 in 0.01") != string::npos); - EXPECT_TRUE(trace.find("retry 2 in 0.02") != string::npos); - EXPECT_TRUE(trace.find("retry 3 in 0.03") != string::npos); - EXPECT_TRUE(trace.find("retry 4 in 0.04") != string::npos); - EXPECT_TRUE(trace.find("retry 5 in 0.05") != string::npos); + EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos); + EXPECT_TRUE(trace.find("retry 2 in 0.020") != string::npos); + EXPECT_TRUE(trace.find("retry 3 in 0.040") != string::npos); + EXPECT_TRUE(trace.find("retry 4 in 0.080") != string::npos); + EXPECT_TRUE(trace.find("retry 5 in 0.160") != string::npos); } void @@ -298,11 +297,10 @@ Test::testRequestRetryDelay(TestData &data) EXPECT_TRUE(msg.get() == NULL); string trace = reply->getTrace().toString(); - printf("%s", trace.c_str()); - EXPECT_TRUE(trace.find("retry 1 in 0") != string::npos); - EXPECT_TRUE(trace.find("retry 2 in 0.02") != string::npos); - EXPECT_TRUE(trace.find("retry 3 in 0.04") != string::npos); - EXPECT_TRUE(trace.find("retry 4 in 0.06") != string::npos); - EXPECT_TRUE(trace.find("retry 5 in 0.08") != string::npos); + EXPECT_TRUE(trace.find("retry 1 in 0.000") != string::npos); + EXPECT_TRUE(trace.find("retry 2 in 0.020") != string::npos); + EXPECT_TRUE(trace.find("retry 3 in 0.040") != string::npos); + EXPECT_TRUE(trace.find("retry 4 in 0.060") != string::npos); + EXPECT_TRUE(trace.find("retry 5 in 0.080") != string::npos); } diff --git a/messagebus/src/tests/retrypolicy/retrypolicy.cpp b/messagebus/src/tests/retrypolicy/retrypolicy.cpp index 1ca452c4cf1..b03357b9498 100644 --- a/messagebus/src/tests/retrypolicy/retrypolicy.cpp +++ b/messagebus/src/tests/retrypolicy/retrypolicy.cpp @@ -12,14 +12,17 @@ int Test::Main() { TEST_INIT("retrypolicy_test"); - + constexpr double DELAY(0.001); RetryTransientErrorsPolicy policy; + policy.setBaseDelay(DELAY); + EXPECT_EQUAL(0.0, policy.getRetryDelay(0)); + EXPECT_EQUAL(0.0, policy.getRetryDelay(1)); + for (uint32_t j = 2; j < 15; ++j) { + EXPECT_EQUAL(DELAY*(1 << (j-1)), policy.getRetryDelay(j)); + } + EXPECT_EQUAL(10.0, policy.getRetryDelay(15)); + EXPECT_EQUAL(10.0, policy.getRetryDelay(20)); for (uint32_t i = 0; i < 5; ++i) { - double delay = i / 3.0; - policy.setBaseDelay(delay); - for (uint32_t j = 0; j < 5; ++j) { - EXPECT_EQUAL((int)(j * delay), (int)policy.getRetryDelay(j)); - } for (uint32_t j = ErrorCode::NONE; j < ErrorCode::ERROR_LIMIT; ++j) { policy.setEnabled(true); if (j < ErrorCode::FATAL_ERROR) { diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp index cb9ad2eef47..42c5938fe92 100644 --- a/messagebus/src/tests/routing/routing.cpp +++ b/messagebus/src/tests/routing/routing.cpp @@ -635,7 +635,6 @@ Test::testTrace(TestData &data, const std::vector<string> &expected) if (!EXPECT_TRUE(reply.get() != NULL)) { return false; } - printf("%s", reply->getTrace().toString().c_str()); if (!EXPECT_TRUE(!reply->hasErrors())) { return false; } @@ -747,7 +746,6 @@ Test::testNoRoutingTable(TestData &data) Result res = data._srcSession->send(createMessage("msg"), "foo"); EXPECT_TRUE(!res.isAccepted()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode()); - printf("%s\n", res.getError().getMessage().c_str()); Message::UP msg = res.getMessage(); EXPECT_TRUE(msg.get() != NULL); } @@ -760,7 +758,6 @@ Test::testUnknownRoute(TestData &data) Result res = data._srcSession->send(createMessage("msg"), "baz"); EXPECT_TRUE(!res.isAccepted()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, res.getError().getCode()); - printf("%s\n", res.getError().getMessage().c_str()); Message::UP msg = res.getMessage(); EXPECT_TRUE(msg.get() != NULL); } @@ -771,7 +768,6 @@ Test::testNoRoute(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route()).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -787,7 +783,6 @@ Test::testRecognizeHopName(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); } @@ -803,7 +798,6 @@ Test::testRecognizeRouteDirective(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); } @@ -818,7 +812,6 @@ Test::testRecognizeRouteName(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); } @@ -831,7 +824,6 @@ Test::testHopResolutionOverflow(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("foo")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -844,7 +836,6 @@ Test::testRouteResolutionOverflow(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), "foo").isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); } @@ -863,7 +854,6 @@ Test::testInsertRoute(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); } @@ -875,7 +865,6 @@ Test::testErrorDirective(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), route).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::ILLEGAL_ROUTE, reply->getError(0).getCode()); EXPECT_EQUAL("err", reply->getError(0).getMessage()); @@ -907,7 +896,6 @@ Test::testSelectNone(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_SERVICES_FOR_ROUTE, reply->getError(0).getCode()); } @@ -925,7 +913,6 @@ Test::testSelectOne(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); } @@ -951,7 +938,6 @@ Test::testResend1(TestData &data) data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("[APP_TRANSIENT_ERROR @ localhost]: err1") @@ -987,7 +973,6 @@ Test::testResend2(TestData &data) data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Source session accepted a 3 byte message. 1 message(s) now pending.") @@ -1002,7 +987,7 @@ Test::testResend2(TestData &data) .add("Reply (type 0) received at client.") .add("Routing policy 'Custom' merging replies.") .add("Merged { 'dst/session' }.") - .add("Message scheduled for retry 1 in 0.00 seconds.") + .add("Message scheduled for retry 1 in 0.000 seconds.") .add("Resender resending message.") .add("Running routing policy 'Custom'.") .add("Selecting { 'dst/session' }.") @@ -1015,7 +1000,7 @@ Test::testResend2(TestData &data) .add("Reply (type 0) received at client.") .add("Routing policy 'Custom' merging replies.") .add("Merged { 'dst/session' }.") - .add("Message scheduled for retry 2 in 0.00 seconds.") + .add("Message scheduled for retry 2 in 0.000 seconds.") .add("Resender resending message.") .add("Running routing policy 'Custom'.") .add("Selecting { 'dst/session' }.") @@ -1044,7 +1029,6 @@ Test::testNoResend(TestData &data) data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_TRANSIENT_ERROR, reply->getError(0).getCode()); } @@ -1069,7 +1053,6 @@ Test::testSelectOnResend(TestData &data) data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Selecting { 'dst/session' }.") @@ -1102,7 +1085,6 @@ Test::testNoSelectOnResend(TestData &data) data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Selecting { 'dst/session' }.") @@ -1129,7 +1111,6 @@ Test::testCanConsumeError(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); EXPECT_TRUE(testTrace(StringList() @@ -1175,7 +1156,6 @@ Test::testNestedPolicies(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); } @@ -1197,7 +1177,6 @@ Test::testRemoveReply(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("[NO_ADDRESS_FOR_SERVICE @ localhost]") @@ -1222,7 +1201,6 @@ Test::testSetReply(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL("foo", reply->getError(0).getMessage()); @@ -1253,7 +1231,6 @@ Test::testResendSetAndReuseReply(TestData &data) data._dstSession->acknowledge(std::move(msg)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); } @@ -1277,7 +1254,6 @@ Test::testResendSetAndRemoveReply(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL("foo", reply->getError(0).getMessage()); @@ -1302,7 +1278,6 @@ Test::testHopIgnoresReply(TestData &data) data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Not waiting for a reply from 'dst/session'."), @@ -1323,7 +1298,6 @@ Test::testHopBlueprintIgnoresReply(TestData &data) data._dstSession->reply(std::move(reply)); reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_TRUE(!reply->hasErrors()); EXPECT_TRUE(testTrace(StringList() .add("Not waiting for a reply from 'dst/session'."), @@ -1341,7 +1315,6 @@ Test::testAcceptEmptyRoute(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); } void @@ -1361,7 +1334,6 @@ Test::testAbortOnlyActiveNodes(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Custom:[SetReply:foo],?bar,dst/session]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::APP_FATAL_ERROR, reply->getError(0).getCode()); EXPECT_EQUAL((uint32_t)ErrorCode::SEND_ABORTED, reply->getError(1).getCode()); @@ -1373,7 +1345,6 @@ Test::testUnknownPolicy(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("[Unknown]")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::UNKNOWN_POLICY, reply->getError(0).getCode()); } @@ -1392,7 +1363,6 @@ Test::testSelectException(TestData &data) .isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR, reply->getError(0).getCode()); @@ -1417,7 +1387,6 @@ Test::testMergeException(TestData &data) data._dstSession->acknowledge(std::move(msg)); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(1u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::POLICY_ERROR, reply->getError(0).getCode()); @@ -1568,7 +1537,6 @@ Test::testTimeout(TestData &data) EXPECT_TRUE(data._srcSession->send(createMessage("msg"), Route::parse("dst/unknown")).isAccepted()); Reply::UP reply = data._srcHandler.getReply(RECEPTOR_TIMEOUT); ASSERT_TRUE(reply.get() != NULL); - printf("%s", reply->getTrace().toString().c_str()); EXPECT_EQUAL(2u, reply->getNumErrors()); EXPECT_EQUAL((uint32_t)ErrorCode::NO_ADDRESS_FOR_SERVICE, reply->getError(0).getCode()); EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(1).getCode()); diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp index 048d81b697e..0ba6e0827b2 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.cpp +++ b/messagebus/src/vespa/messagebus/routing/resender.cpp @@ -85,7 +85,7 @@ Resender::scheduleRetry(RoutingNode &node) node.prepareForRetry(); // consumes the reply node.getTrace().trace( TraceLevel::COMPONENT, - vespalib::make_string("Message scheduled for retry %u in %.2f seconds.", retry, delay)); + vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay)); msg.setRetry(retry); _queue.push(Entry((uint64_t)(_time.MilliSecsToNow() + delay * 1000), &node)); return true; diff --git a/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp b/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp index 7a2147b13ff..5aa11104b09 100644 --- a/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp +++ b/messagebus/src/vespa/messagebus/routing/retrytransienterrorspolicy.cpp @@ -7,7 +7,7 @@ namespace mbus { RetryTransientErrorsPolicy::RetryTransientErrorsPolicy() : _enabled(true), - _baseDelay(1.0) + _baseDelay(0.001) {} RetryTransientErrorsPolicy & @@ -29,7 +29,11 @@ RetryTransientErrorsPolicy::canRetry(uint32_t errorCode) const { double RetryTransientErrorsPolicy::getRetryDelay(uint32_t retry) const { - return _baseDelay.load(std::memory_order_relaxed) * retry; + uint64_t retryMultiplier = 0l; + if (retry > 1) { + retryMultiplier = 1L << std::min(20u, retry-1); + } + return std::min(10.0, _baseDelay.load(std::memory_order_relaxed) * retryMultiplier); } } |