diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-14 15:52:17 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-14 15:57:48 +0000 |
commit | 680baa130aac6b653bb2b5aae8589a18b97387dd (patch) | |
tree | 8cb6f66eb34caf92280c2298492e8f401bae869d | |
parent | d52bbc80f5ae6ed0f8a6019caff2364304c427c8 (diff) |
Gracefully abort outdated reindexing visitors
Instead of sending `REJECTED` when lock token does not match, send
`TEST_AND_SET_CONDITION_FAILED` from distributors. The reindexing visitor
will detect this and remap the failure code to `ABORTED` while
simultaneously failing the backend visitor.
`ABORTED` will be considered transient by the visiting client, allowing
it to retry towards another distributor without the whole thing
having to fail out.
7 files changed, 104 insertions, 23 deletions
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 520e5c31cfb..e566f16dcdd 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -610,10 +610,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); put->setCondition(bucket_lock_bypass_tas_condition("foo")); ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put)); - // TODO determine most appropriate error code here. Want to fail the bucket but - // not the entire visitor operation. Will likely need to be revisited (heh!) soon. - EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, " - "but none currently exists)", + EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket " + "lock to be present, but none currently exists)", _sender.reply(0)->getResult().toString()); } @@ -625,8 +623,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte Operation::SP op; ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op)); ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put))); - EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, " - "but none currently exists)", + EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket " + "lock to be present, but none currently exists)", _sender.reply(0)->getResult().toString()); } diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index b362e368b9b..a8c317655ca 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -1,21 +1,22 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/config/common/exceptions.h> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/test/make_bucket_space.h> -#include <vespa/storageapi/message/datagram.h> -#include <vespa/storageapi/message/persistence.h> +#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/visitor.h> +#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/visiting/visitormanager.h> +#include <vespa/storageapi/message/datagram.h> +#include <vespa/storageapi/message/persistence.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> #include <tests/storageserver/testvisitormessagesession.h> -#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/visitor.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/gtest/gtest.h> #include <thread> #include <sys/stat.h> @@ -681,8 +682,7 @@ VisitorTest::sendInitialCreateVisitorAndGetIterRound() { GetIterCommand::SP getIterCmd; ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, getIterCmd)); - sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), - 1, true); + sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 1, true); } } @@ -729,8 +729,7 @@ TEST_F(VisitorTest, no_visitor_notification_for_transient_failures) { } TEST_F(VisitorTest, notification_sent_if_transient_error_retried_many_times) { - constexpr size_t retries( - Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY); + constexpr size_t retries = Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY; ASSERT_NO_FATAL_FAILURE(initializeTest()); sendInitialCreateVisitorAndGetIterRound(); @@ -765,7 +764,6 @@ VisitorTest::doCompleteVisitingSession( const std::shared_ptr<api::CreateVisitorCommand>& cmd, std::shared_ptr<api::CreateVisitorReply>& reply_out) { - initializeTest(); _top->sendDown(cmd); sendCreateIteratorReply(); @@ -795,6 +793,7 @@ VisitorTest::doCompleteVisitingSession( } TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor()); cmd->getTrace().setLevel(0); std::shared_ptr<api::CreateVisitorReply> reply; @@ -803,6 +802,7 @@ TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) { } TEST_F(VisitorTest, reply_contains_trace_if_trace_level_above_zero) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor()); cmd->getTrace().setLevel(1); cmd->getTrace().trace(1,"at least one trace."); @@ -887,4 +887,65 @@ TEST_F(VisitorTest, test_visitor_invokes_weak_read_consistency_iteration) { "testvisitor", spi::ReadConsistency::WEAK); } +struct ReindexingVisitorTest : VisitorTest { + void respond_with_docs_from_persistence() { + sendCreateIteratorReply(); + GetIterCommand::SP get_iter_cmd; + // Reply to GetIter with a single doc and bucket completed + ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, get_iter_cmd)); + sendGetIterReply(*get_iter_cmd, api::ReturnCode(api::ReturnCode::OK), 1, true); + } + + void respond_to_client_put(api::ReturnCode::Result result) { + // Reply to the Put from "client" back to the visitor + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> doc_ids; + std::vector<std::string> info_messages; + getMessagesAndReply(1, getSession(0), docs, doc_ids, info_messages, result); + } + + void complete_visitor() { + DestroyIteratorCommand::SP destroy_iter_cmd; + ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<DestroyIteratorCommand>(*_bottom, destroy_iter_cmd)); + } +}; + +TEST_F(ReindexingVisitorTest, puts_are_sent_with_tas_condition) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); + auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor")); + cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar"); + _top->sendDown(cmd); + + ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence()); + auto& session = getSession(0); + session.waitForMessages(1); + + ASSERT_EQ(session.sentMessages.size(), 1u); + auto* put_cmd = dynamic_cast<documentapi::PutDocumentMessage*>(session.sentMessages.front().get()); + ASSERT_TRUE(put_cmd); + auto token_str = vespalib::make_string("%s=foobar", reindexing_bucket_lock_bypass_prefix()); + EXPECT_EQ(put_cmd->getCondition().getSelection(), token_str); + + ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::OK)); + ASSERT_NO_FATAL_FAILURE(complete_visitor()); + + ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK)); + ASSERT_TRUE(waitUntilNoActiveVisitors()); +} + + +TEST_F(ReindexingVisitorTest, tas_responses_fail_the_visitor_and_are_rewritten_to_aborted) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); + auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor")); + cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar"); + _top->sendDown(cmd); + + ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence()); + ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED)); + ASSERT_NO_FATAL_FAILURE(complete_visitor()); + + ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::ABORTED, -1, -1)); + ASSERT_TRUE(waitUntilNoActiveVisitors()); +} + } // namespace storage diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 7c468457430..c7466c5f420 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -301,12 +301,12 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op allow = true; } else { - bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED, + bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, "Expected bucket lock token did not match actual lock token")); return true; } } else { - bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED, + bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, "Operation expects a read-for-write bucket lock to be present, " "but none currently exists")); return true; diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp index 2d96876b641..5577bcd00ae 100644 --- a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp @@ -19,7 +19,8 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/, HitCounter& hitCounter) { auto lock_token = make_lock_access_token(); - LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size()); + LOG(debug, "ReindexingVisitor %s handling block of %zu documents. Using access token '%s'", + _id.c_str(), entries.size(), lock_token.c_str()); for (auto& entry : entries) { if (entry->isRemove()) { // We don't reindex removed documents, as that would be very silly. @@ -34,6 +35,15 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/, } } +bool ReindexingVisitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) { + if (in_out_code.getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED) { + in_out_code = api::ReturnCode(api::ReturnCode::ABORTED, "Got TaS failure from upstream, indicating visitor is " + "outdated. Aborting session to allow client to retry"); + return true; + } + return Visitor::remap_docapi_message_error_code(in_out_code); +} + vespalib::string ReindexingVisitor::make_lock_access_token() const { vespalib::string prefix = reindexing_bucket_lock_bypass_prefix(); vespalib::stringref passed_token = visitor_parameters().get( diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h index 815ef7e67fa..0fdba607134 100644 --- a/storage/src/vespa/storage/visiting/reindexing_visitor.h +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h @@ -21,6 +21,7 @@ public: private: void handleDocuments(const document::BucketId&, std::vector<spi::DocEntry::UP>&, HitCounter&) override; + bool remap_docapi_message_error_code(api::ReturnCode& in_out_code) override; vespalib::string make_lock_access_token() const; }; diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index c846e0357b4..e69721e8177 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -607,6 +607,11 @@ Visitor::visitor_parameters() const noexcept { return _initiatingCmd->getParameters(); } +bool +Visitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) { + return in_out_code.isCriticalForVisitor(); +} + void Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& metrics) { @@ -640,8 +645,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met metrics.visitorDestinationFailureReplies.inc(); if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) { - LOG(debug, "Aborting visitor as we failed to talk to " - "controller: %s", + LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str()); api::ReturnCode returnCode( static_cast<api::ReturnCode::Result>( @@ -655,7 +659,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met api::ReturnCode returnCode( static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), reply->getError(0).getMessage()); - if (returnCode.isCriticalForVisitor()) { + const bool should_fail = remap_docapi_message_error_code(returnCode); + if (should_fail) { // Abort - something is wrong with target. fail(returnCode, true); close(); diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 6666308a0b9..d6b4c4b9381 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -350,6 +350,12 @@ protected: bool addBoundedTrace(uint32_t level, const vespalib::string& message); const vdslib::Parameters& visitor_parameters() const noexcept; + + // Possibly modifies the ReturnCode parameter in-place if its return code should + // be changed based on visitor subclass-specific behavior. + // Returns true if the visitor itself should be failed (aborted) with the + // error code, false if the DocumentAPI message should be retried later. + [[nodiscard]] virtual bool remap_docapi_message_error_code(api::ReturnCode& in_out_code); public: Visitor(StorageComponent& component); virtual ~Visitor(); |