diff options
3 files changed, 98 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index f5bacfae252..16d5dcf3b25 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -72,11 +72,11 @@ public: } void sendPut(std::shared_ptr<api::PutCommand> msg) { - op.reset(new PutOperation(getExternalOperationHandler(), - getDistributorBucketSpace(), - msg, - getDistributor().getMetrics(). - puts[msg->getLoadType()])); + op = std::make_unique<PutOperation>(getExternalOperationHandler(), + getDistributorBucketSpace(), + msg, + getDistributor().getMetrics(). + puts[msg->getLoadType()]); op->start(_sender, framework::MilliSecTime(0)); } @@ -88,9 +88,11 @@ public: return std::make_shared<Document>(doc_type(), DocumentId(vespalib::make_string("id:%s:testdoctype1::%s", ns, id))); } - std::shared_ptr<api::PutCommand> createPut(Document::SP doc) const { + static std::shared_ptr<api::PutCommand> createPut(Document::SP doc) { return std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), std::move(doc), 100); } + + void set_up_3_nodes_and_send_put_with_create_bucket_acks(); }; PutOperationTest::~PutOperationTest() = default; @@ -624,4 +626,60 @@ TEST_F(PutOperationTest, replica_not_implicitly_activated_when_activation_is_dis do_test_creation_with_bucket_activation_disabled(true); } +void PutOperationTest::set_up_3_nodes_and_send_put_with_create_bucket_acks() { + setupDistributor(3, 3, "storage:3 distributor:1"); + + Document::SP doc(createDummyDocument("test", "test")); + sendPut(createPut(doc)); + + // Include CreateBucket to ensure we don't count it towards Put ACK majorities + ASSERT_EQ("Create bucket => 2,Create bucket => 1,Create bucket => 0," + "Put => 2,Put => 1,Put => 0", + _sender.getCommands(true)); + + // ACK all CreateBuckets + for (uint32_t i = 0; i < 3; ++i) { + sendReply(i); + } +} + +TEST_F(PutOperationTest, majority_ack_with_minority_tas_failure_returns_success) { + ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks()); + // Majority ACK, minority NACK + sendReply(3); + sendReply(4, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED); + sendReply(5); + + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(NONE)", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, minority_ack_with_majority_tas_failure_returns_tas_failure) { + ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks()); + // Minority ACK, majority NACK + sendReply(3); + sendReply(4, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED); + sendReply(5, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED); + + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(TEST_AND_SET_CONDITION_FAILED)", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, minority_failure_override_not_in_effect_for_non_tas_errors) { + ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks()); + // Minority ACK, majority NACK. But non-TaS failure. + sendReply(3); + sendReply(4, api::ReturnCode::ABORTED); + sendReply(5); + + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(ABORTED)", + _sender.getLastReply()); +} + } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index d98a2ce6a74..6362db6c002 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -2,7 +2,6 @@ #include "persistencemessagetracker.h" #include <vespa/storage/common/vectorprinter.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/persistence.h> #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" @@ -24,6 +23,8 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( _manager(link), _revertTimestamp(revertTimestamp), _requestTimer(link.getClock()), + _n_persistence_replies_total(0), + _n_successful_persistence_replies(0), _priority(_reply->getPriority()), _success(true) { @@ -204,9 +205,28 @@ PersistenceMessageTrackerImpl::shouldRevert() const && !_revertNodes.empty() && !_success && _reply; } +bool PersistenceMessageTrackerImpl::has_majority_successful_replies() const noexcept { + // FIXME this has questionable interaction with early client ACK since we only count + // the number of observed replies rather than the number of total requests sent. + // ... but the early ACK-feature dearly needs a redesign anyway. + return (_n_successful_persistence_replies >= (_n_persistence_replies_total / 2 + 1)); +} + +bool PersistenceMessageTrackerImpl::has_minority_test_and_set_failure() const noexcept { + return ((_reply->getResult().getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED) + && has_majority_successful_replies()); +} + void PersistenceMessageTrackerImpl::sendReply(MessageSender& sender) { + // If we've observed _partial_ TaS failures but have had a majority of good ACKs, + // treat the reply as successful. This is because the ACKed write(s) will eventually + // become visible across all nodes. + if (has_minority_test_and_set_failure()) { + _reply->setResult(api::ReturnCode()); + } + updateMetrics(); _trace.setStrict(false); if ( ! _trace.isEmpty()) { @@ -245,11 +265,6 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply( { LOG(spam, "Create bucket reply failed, so deleting it from bucket db"); _manager.removeNodeFromDB(reply.getBucket(), node); - LOG_BUCKET_OPERATION_NO_LOCK( - reply.getBucketId(), - vespalib::make_string( - "Deleted bucket on node %u due to failing create bucket %s", - node, reply.getResult().toString().c_str())); } } @@ -258,12 +273,14 @@ PersistenceMessageTrackerImpl::handlePersistenceReply( api::BucketInfoReply& reply, uint16_t node) { + ++_n_persistence_replies_total; if (reply.getBucketInfo().valid()) { addBucketInfoFromReply(node, reply); } if (reply.getResult().success()) { logSuccessfulReply(node, reply); _revertNodes.emplace_back(reply.getBucket(), node); + ++_n_successful_persistence_replies; } else if (!hasSentReply()) { updateFailureResult(reply); } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index 21977ddd881..4392aa1fc30 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -12,8 +12,8 @@ namespace storage::distributor { struct PersistenceMessageTracker { - virtual ~PersistenceMessageTracker() { } - typedef MessageTracker::ToSend ToSend; + virtual ~PersistenceMessageTracker() = default; + using ToSend = MessageTracker::ToSend; virtual void fail(MessageSender&, const api::ReturnCode&) = 0; virtual void queueMessageBatch(const std::vector<ToSend>&) = 0; @@ -29,7 +29,7 @@ class PersistenceMessageTrackerImpl : public PersistenceMessageTracker, public MessageTracker { private: - typedef std::map<document::Bucket, std::vector<BucketCopy> > BucketInfoMap; + using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>; BucketInfoMap _remapBucketInfo; BucketInfoMap _bucketInfo; @@ -38,7 +38,7 @@ public: std::shared_ptr<api::BucketInfoReply> reply, DistributorComponent&, api::Timestamp revertTimestamp = 0); - ~PersistenceMessageTrackerImpl(); + ~PersistenceMessageTrackerImpl() override; void updateDB(); void updateMetrics(); @@ -52,7 +52,7 @@ public: void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) override; std::shared_ptr<api::BucketInfoReply>& getReply() override { return _reply; } - typedef std::pair<document::Bucket, uint16_t> BucketNodePair; + using BucketNodePair = std::pair<document::Bucket, uint16_t>; void revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes); @@ -65,7 +65,7 @@ public: void queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) override; private: - typedef std::vector<uint64_t> MessageBatch; + using MessageBatch = std::vector<uint64_t>; std::vector<MessageBatch> _messageBatches; PersistenceOperationMetricSet& _metric; @@ -75,14 +75,18 @@ private: std::vector<BucketNodePair> _revertNodes; mbus::TraceNode _trace; framework::MilliSecTimer _requestTimer; + uint32_t _n_persistence_replies_total; + uint32_t _n_successful_persistence_replies; uint8_t _priority; bool _success; bool canSendReplyEarly() const; void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply); void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const; - bool hasSentReply() const { return _reply.get() == 0; } + bool hasSentReply() const noexcept { return !_reply; } bool shouldRevert() const; + bool has_majority_successful_replies() const noexcept; + bool has_minority_test_and_set_failure() const noexcept; void sendReply(MessageSender& sender); void updateFailureResult(const api::BucketInfoReply& reply); void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node); |