summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-14 15:52:17 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-14 15:57:48 +0000
commit680baa130aac6b653bb2b5aae8589a18b97387dd (patch)
tree8cb6f66eb34caf92280c2298492e8f401bae869d
parentd52bbc80f5ae6ed0f8a6019caff2364304c427c8 (diff)
Gracefully abort outdated reindexing visitors
Instead of sending `REJECTED` when lock token does not match, send `TEST_AND_SET_CONDITION_FAILED` from distributors. The reindexing visitor will detect this and remap the failure code to `ABORTED` while simultaneously failing the backend visitor. `ABORTED` will be considered transient by the visiting client, allowing it to retry towards another distributor without the whole thing having to fail out.
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp10
-rw-r--r--storage/src/tests/visiting/visitortest.cpp83
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp4
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.cpp12
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.h1
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp11
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h6
7 files changed, 104 insertions, 23 deletions
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 520e5c31cfb..e566f16dcdd 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -610,10 +610,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte
auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
put->setCondition(bucket_lock_bypass_tas_condition("foo"));
ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put));
- // TODO determine most appropriate error code here. Want to fail the bucket but
- // not the entire visitor operation. Will likely need to be revisited (heh!) soon.
- EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, "
- "but none currently exists)",
+ EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket "
+ "lock to be present, but none currently exists)",
_sender.reply(0)->getResult().toString());
}
@@ -625,8 +623,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte
Operation::SP op;
ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op));
ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put)));
- EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, "
- "but none currently exists)",
+ EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket "
+ "lock to be present, but none currently exists)",
_sender.reply(0)->getResult().toString());
}
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index b362e368b9b..a8c317655ca 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -1,21 +1,22 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/config/common/exceptions.h>
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/storageapi/message/datagram.h>
-#include <vespa/storageapi/message/persistence.h>
+#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/visitor.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/visiting/visitormanager.h>
+#include <vespa/storageapi/message/datagram.h>
+#include <vespa/storageapi/message/persistence.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <tests/storageserver/testvisitormessagesession.h>
-#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/visitor.h>
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <thread>
#include <sys/stat.h>
@@ -681,8 +682,7 @@ VisitorTest::sendInitialCreateVisitorAndGetIterRound()
{
GetIterCommand::SP getIterCmd;
ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, getIterCmd));
- sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK),
- 1, true);
+ sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 1, true);
}
}
@@ -729,8 +729,7 @@ TEST_F(VisitorTest, no_visitor_notification_for_transient_failures) {
}
TEST_F(VisitorTest, notification_sent_if_transient_error_retried_many_times) {
- constexpr size_t retries(
- Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY);
+ constexpr size_t retries = Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY;
ASSERT_NO_FATAL_FAILURE(initializeTest());
sendInitialCreateVisitorAndGetIterRound();
@@ -765,7 +764,6 @@ VisitorTest::doCompleteVisitingSession(
const std::shared_ptr<api::CreateVisitorCommand>& cmd,
std::shared_ptr<api::CreateVisitorReply>& reply_out)
{
- initializeTest();
_top->sendDown(cmd);
sendCreateIteratorReply();
@@ -795,6 +793,7 @@ VisitorTest::doCompleteVisitingSession(
}
TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor());
cmd->getTrace().setLevel(0);
std::shared_ptr<api::CreateVisitorReply> reply;
@@ -803,6 +802,7 @@ TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) {
}
TEST_F(VisitorTest, reply_contains_trace_if_trace_level_above_zero) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor());
cmd->getTrace().setLevel(1);
cmd->getTrace().trace(1,"at least one trace.");
@@ -887,4 +887,65 @@ TEST_F(VisitorTest, test_visitor_invokes_weak_read_consistency_iteration) {
"testvisitor", spi::ReadConsistency::WEAK);
}
+struct ReindexingVisitorTest : VisitorTest {
+ void respond_with_docs_from_persistence() {
+ sendCreateIteratorReply();
+ GetIterCommand::SP get_iter_cmd;
+ // Reply to GetIter with a single doc and bucket completed
+ ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, get_iter_cmd));
+ sendGetIterReply(*get_iter_cmd, api::ReturnCode(api::ReturnCode::OK), 1, true);
+ }
+
+ void respond_to_client_put(api::ReturnCode::Result result) {
+ // Reply to the Put from "client" back to the visitor
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> doc_ids;
+ std::vector<std::string> info_messages;
+ getMessagesAndReply(1, getSession(0), docs, doc_ids, info_messages, result);
+ }
+
+ void complete_visitor() {
+ DestroyIteratorCommand::SP destroy_iter_cmd;
+ ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<DestroyIteratorCommand>(*_bottom, destroy_iter_cmd));
+ }
+};
+
+TEST_F(ReindexingVisitorTest, puts_are_sent_with_tas_condition) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
+ auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor"));
+ cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar");
+ _top->sendDown(cmd);
+
+ ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence());
+ auto& session = getSession(0);
+ session.waitForMessages(1);
+
+ ASSERT_EQ(session.sentMessages.size(), 1u);
+ auto* put_cmd = dynamic_cast<documentapi::PutDocumentMessage*>(session.sentMessages.front().get());
+ ASSERT_TRUE(put_cmd);
+ auto token_str = vespalib::make_string("%s=foobar", reindexing_bucket_lock_bypass_prefix());
+ EXPECT_EQ(put_cmd->getCondition().getSelection(), token_str);
+
+ ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::OK));
+ ASSERT_NO_FATAL_FAILURE(complete_visitor());
+
+ ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK));
+ ASSERT_TRUE(waitUntilNoActiveVisitors());
+}
+
+
+TEST_F(ReindexingVisitorTest, tas_responses_fail_the_visitor_and_are_rewritten_to_aborted) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
+ auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor"));
+ cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar");
+ _top->sendDown(cmd);
+
+ ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence());
+ ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED));
+ ASSERT_NO_FATAL_FAILURE(complete_visitor());
+
+ ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::ABORTED, -1, -1));
+ ASSERT_TRUE(waitUntilNoActiveVisitors());
+}
+
} // namespace storage
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 7c468457430..c7466c5f420 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -301,12 +301,12 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd
cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op
allow = true;
} else {
- bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED,
+ bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
"Expected bucket lock token did not match actual lock token"));
return true;
}
} else {
- bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED,
+ bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
"Operation expects a read-for-write bucket lock to be present, "
"but none currently exists"));
return true;
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
index 2d96876b641..5577bcd00ae 100644
--- a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
@@ -19,7 +19,8 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/,
HitCounter& hitCounter)
{
auto lock_token = make_lock_access_token();
- LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size());
+ LOG(debug, "ReindexingVisitor %s handling block of %zu documents. Using access token '%s'",
+ _id.c_str(), entries.size(), lock_token.c_str());
for (auto& entry : entries) {
if (entry->isRemove()) {
// We don't reindex removed documents, as that would be very silly.
@@ -34,6 +35,15 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/,
}
}
+bool ReindexingVisitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) {
+ if (in_out_code.getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED) {
+ in_out_code = api::ReturnCode(api::ReturnCode::ABORTED, "Got TaS failure from upstream, indicating visitor is "
+ "outdated. Aborting session to allow client to retry");
+ return true;
+ }
+ return Visitor::remap_docapi_message_error_code(in_out_code);
+}
+
vespalib::string ReindexingVisitor::make_lock_access_token() const {
vespalib::string prefix = reindexing_bucket_lock_bypass_prefix();
vespalib::stringref passed_token = visitor_parameters().get(
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h
index 815ef7e67fa..0fdba607134 100644
--- a/storage/src/vespa/storage/visiting/reindexing_visitor.h
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h
@@ -21,6 +21,7 @@ public:
private:
void handleDocuments(const document::BucketId&, std::vector<spi::DocEntry::UP>&, HitCounter&) override;
+ bool remap_docapi_message_error_code(api::ReturnCode& in_out_code) override;
vespalib::string make_lock_access_token() const;
};
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index c846e0357b4..e69721e8177 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -607,6 +607,11 @@ Visitor::visitor_parameters() const noexcept {
return _initiatingCmd->getParameters();
}
+bool
+Visitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) {
+ return in_out_code.isCriticalForVisitor();
+}
+
void
Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& metrics)
{
@@ -640,8 +645,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
metrics.visitorDestinationFailureReplies.inc();
if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) {
- LOG(debug, "Aborting visitor as we failed to talk to "
- "controller: %s",
+ LOG(debug, "Aborting visitor as we failed to talk to controller: %s",
reply->getError(0).toString().c_str());
api::ReturnCode returnCode(
static_cast<api::ReturnCode::Result>(
@@ -655,7 +659,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
api::ReturnCode returnCode(
static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
reply->getError(0).getMessage());
- if (returnCode.isCriticalForVisitor()) {
+ const bool should_fail = remap_docapi_message_error_code(returnCode);
+ if (should_fail) {
// Abort - something is wrong with target.
fail(returnCode, true);
close();
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 6666308a0b9..d6b4c4b9381 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -350,6 +350,12 @@ protected:
bool addBoundedTrace(uint32_t level, const vespalib::string& message);
const vdslib::Parameters& visitor_parameters() const noexcept;
+
+ // Possibly modifies the ReturnCode parameter in-place if its return code should
+ // be changed based on visitor subclass-specific behavior.
+ // Returns true if the visitor itself should be failed (aborted) with the
+ // error code, false if the DocumentAPI message should be retried later.
+ [[nodiscard]] virtual bool remap_docapi_message_error_code(api::ReturnCode& in_out_code);
public:
Visitor(StorageComponent& component);
virtual ~Visitor();