summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp70
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h18
3 files changed, 98 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index f5bacfae252..16d5dcf3b25 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -72,11 +72,11 @@ public:
}
void sendPut(std::shared_ptr<api::PutCommand> msg) {
- op.reset(new PutOperation(getExternalOperationHandler(),
- getDistributorBucketSpace(),
- msg,
- getDistributor().getMetrics().
- puts[msg->getLoadType()]));
+ op = std::make_unique<PutOperation>(getExternalOperationHandler(),
+ getDistributorBucketSpace(),
+ msg,
+ getDistributor().getMetrics().
+ puts[msg->getLoadType()]);
op->start(_sender, framework::MilliSecTime(0));
}
@@ -88,9 +88,11 @@ public:
return std::make_shared<Document>(doc_type(), DocumentId(vespalib::make_string("id:%s:testdoctype1::%s", ns, id)));
}
- std::shared_ptr<api::PutCommand> createPut(Document::SP doc) const {
+ static std::shared_ptr<api::PutCommand> createPut(Document::SP doc) {
return std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), std::move(doc), 100);
}
+
+ void set_up_3_nodes_and_send_put_with_create_bucket_acks();
};
PutOperationTest::~PutOperationTest() = default;
@@ -624,4 +626,60 @@ TEST_F(PutOperationTest, replica_not_implicitly_activated_when_activation_is_dis
do_test_creation_with_bucket_activation_disabled(true);
}
+void PutOperationTest::set_up_3_nodes_and_send_put_with_create_bucket_acks() {
+ setupDistributor(3, 3, "storage:3 distributor:1");
+
+ Document::SP doc(createDummyDocument("test", "test"));
+ sendPut(createPut(doc));
+
+ // Include CreateBucket to ensure we don't count it towards Put ACK majorities
+ ASSERT_EQ("Create bucket => 2,Create bucket => 1,Create bucket => 0,"
+ "Put => 2,Put => 1,Put => 0",
+ _sender.getCommands(true));
+
+ // ACK all CreateBuckets
+ for (uint32_t i = 0; i < 3; ++i) {
+ sendReply(i);
+ }
+}
+
+TEST_F(PutOperationTest, majority_ack_with_minority_tas_failure_returns_success) {
+ ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks());
+ // Majority ACK, minority NACK
+ sendReply(3);
+ sendReply(4, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED);
+ sendReply(5);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, minority_ack_with_majority_tas_failure_returns_tas_failure) {
+ ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks());
+ // Minority ACK, majority NACK
+ sendReply(3);
+ sendReply(4, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED);
+ sendReply(5, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(TEST_AND_SET_CONDITION_FAILED)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, minority_failure_override_not_in_effect_for_non_tas_errors) {
+ ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks());
+ // Minority ACK, majority NACK. But non-TaS failure.
+ sendReply(3);
+ sendReply(4, api::ReturnCode::ABORTED);
+ sendReply(5);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(ABORTED)",
+ _sender.getLastReply());
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index d98a2ce6a74..6362db6c002 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -2,7 +2,6 @@
#include "persistencemessagetracker.h"
#include <vespa/storage/common/vectorprinter.h>
-#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/persistence.h>
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
@@ -24,6 +23,8 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
_manager(link),
_revertTimestamp(revertTimestamp),
_requestTimer(link.getClock()),
+ _n_persistence_replies_total(0),
+ _n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
_success(true)
{
@@ -204,9 +205,28 @@ PersistenceMessageTrackerImpl::shouldRevert() const
&& !_revertNodes.empty() && !_success && _reply;
}
+bool PersistenceMessageTrackerImpl::has_majority_successful_replies() const noexcept {
+ // FIXME this has questionable interaction with early client ACK since we only count
+ // the number of observed replies rather than the number of total requests sent.
+ // ... but the early ACK-feature dearly needs a redesign anyway.
+ return (_n_successful_persistence_replies >= (_n_persistence_replies_total / 2 + 1));
+}
+
+bool PersistenceMessageTrackerImpl::has_minority_test_and_set_failure() const noexcept {
+ return ((_reply->getResult().getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED)
+ && has_majority_successful_replies());
+}
+
void
PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
{
+ // If we've observed _partial_ TaS failures but have had a majority of good ACKs,
+ // treat the reply as successful. This is because the ACKed write(s) will eventually
+ // become visible across all nodes.
+ if (has_minority_test_and_set_failure()) {
+ _reply->setResult(api::ReturnCode());
+ }
+
updateMetrics();
_trace.setStrict(false);
if ( ! _trace.isEmpty()) {
@@ -245,11 +265,6 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply(
{
LOG(spam, "Create bucket reply failed, so deleting it from bucket db");
_manager.removeNodeFromDB(reply.getBucket(), node);
- LOG_BUCKET_OPERATION_NO_LOCK(
- reply.getBucketId(),
- vespalib::make_string(
- "Deleted bucket on node %u due to failing create bucket %s",
- node, reply.getResult().toString().c_str()));
}
}
@@ -258,12 +273,14 @@ PersistenceMessageTrackerImpl::handlePersistenceReply(
api::BucketInfoReply& reply,
uint16_t node)
{
+ ++_n_persistence_replies_total;
if (reply.getBucketInfo().valid()) {
addBucketInfoFromReply(node, reply);
}
if (reply.getResult().success()) {
logSuccessfulReply(node, reply);
_revertNodes.emplace_back(reply.getBucket(), node);
+ ++_n_successful_persistence_replies;
} else if (!hasSentReply()) {
updateFailureResult(reply);
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 21977ddd881..4392aa1fc30 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -12,8 +12,8 @@
namespace storage::distributor {
struct PersistenceMessageTracker {
- virtual ~PersistenceMessageTracker() { }
- typedef MessageTracker::ToSend ToSend;
+ virtual ~PersistenceMessageTracker() = default;
+ using ToSend = MessageTracker::ToSend;
virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
virtual void queueMessageBatch(const std::vector<ToSend>&) = 0;
@@ -29,7 +29,7 @@ class PersistenceMessageTrackerImpl : public PersistenceMessageTracker,
public MessageTracker
{
private:
- typedef std::map<document::Bucket, std::vector<BucketCopy> > BucketInfoMap;
+ using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
BucketInfoMap _remapBucketInfo;
BucketInfoMap _bucketInfo;
@@ -38,7 +38,7 @@ public:
std::shared_ptr<api::BucketInfoReply> reply,
DistributorComponent&,
api::Timestamp revertTimestamp = 0);
- ~PersistenceMessageTrackerImpl();
+ ~PersistenceMessageTrackerImpl() override;
void updateDB();
void updateMetrics();
@@ -52,7 +52,7 @@ public:
void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) override;
std::shared_ptr<api::BucketInfoReply>& getReply() override { return _reply; }
- typedef std::pair<document::Bucket, uint16_t> BucketNodePair;
+ using BucketNodePair = std::pair<document::Bucket, uint16_t>;
void revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes);
@@ -65,7 +65,7 @@ public:
void queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) override;
private:
- typedef std::vector<uint64_t> MessageBatch;
+ using MessageBatch = std::vector<uint64_t>;
std::vector<MessageBatch> _messageBatches;
PersistenceOperationMetricSet& _metric;
@@ -75,14 +75,18 @@ private:
std::vector<BucketNodePair> _revertNodes;
mbus::TraceNode _trace;
framework::MilliSecTimer _requestTimer;
+ uint32_t _n_persistence_replies_total;
+ uint32_t _n_successful_persistence_replies;
uint8_t _priority;
bool _success;
bool canSendReplyEarly() const;
void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply);
void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const;
- bool hasSentReply() const { return _reply.get() == 0; }
+ bool hasSentReply() const noexcept { return !_reply; }
bool shouldRevert() const;
+ bool has_majority_successful_replies() const noexcept;
+ bool has_minority_test_and_set_failure() const noexcept;
void sendReply(MessageSender& sender);
void updateFailureResult(const api::BucketInfoReply& reply);
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);