summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-08-26 11:33:06 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-08-26 11:33:06 +0000
commit3659ad0d235b1a067f3fce597f92cbd209bde817 (patch)
treeca0fa579dd67cb999a7b934fc288965095102b2f /storage/src
parent02959f16902f8c76f0878840cd8685b2ffe1d50d (diff)
Treat replica minority test-and-set failure as success
Since Put/Remove operations do not have a write-repair pipeline when replicas are out of sync/missing, any such TaS mutation hitting a stale replica will fail. This will normally fail the entire operation back to the client with a TaS failure even though the mutation will eventually become visible in the cluster. This is particularly noticeable for documents that are globally distributed across all nodes. This commit introduces a pragmatic approach to this issue where TaS failures will be effectively ignored iff a _majority_ of replicas have ACKed the mutation successfully. This is a tradeoff where we don't want to "lie" to the client about the level of redundancy of the ACKed operation, but also don't want to introduce too many client resends due to spurious failures. Requiring a majority is a good middle ground (and is what we would like to move towards in the future with a majority consensus system anyway).
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp70
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h18
3 files changed, 98 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index f5bacfae252..16d5dcf3b25 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -72,11 +72,11 @@ public:
}
void sendPut(std::shared_ptr<api::PutCommand> msg) {
- op.reset(new PutOperation(getExternalOperationHandler(),
- getDistributorBucketSpace(),
- msg,
- getDistributor().getMetrics().
- puts[msg->getLoadType()]));
+ op = std::make_unique<PutOperation>(getExternalOperationHandler(),
+ getDistributorBucketSpace(),
+ msg,
+ getDistributor().getMetrics().
+ puts[msg->getLoadType()]);
op->start(_sender, framework::MilliSecTime(0));
}
@@ -88,9 +88,11 @@ public:
return std::make_shared<Document>(doc_type(), DocumentId(vespalib::make_string("id:%s:testdoctype1::%s", ns, id)));
}
- std::shared_ptr<api::PutCommand> createPut(Document::SP doc) const {
+ static std::shared_ptr<api::PutCommand> createPut(Document::SP doc) {
return std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), std::move(doc), 100);
}
+
+ void set_up_3_nodes_and_send_put_with_create_bucket_acks();
};
PutOperationTest::~PutOperationTest() = default;
@@ -624,4 +626,60 @@ TEST_F(PutOperationTest, replica_not_implicitly_activated_when_activation_is_dis
do_test_creation_with_bucket_activation_disabled(true);
}
+void PutOperationTest::set_up_3_nodes_and_send_put_with_create_bucket_acks() {
+ setupDistributor(3, 3, "storage:3 distributor:1");
+
+ Document::SP doc(createDummyDocument("test", "test"));
+ sendPut(createPut(doc));
+
+ // Include CreateBucket to ensure we don't count it towards Put ACK majorities
+ ASSERT_EQ("Create bucket => 2,Create bucket => 1,Create bucket => 0,"
+ "Put => 2,Put => 1,Put => 0",
+ _sender.getCommands(true));
+
+ // ACK all CreateBuckets
+ for (uint32_t i = 0; i < 3; ++i) {
+ sendReply(i);
+ }
+}
+
+TEST_F(PutOperationTest, majority_ack_with_minority_tas_failure_returns_success) {
+ ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks());
+ // Majority ACK, minority NACK
+ sendReply(3);
+ sendReply(4, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED);
+ sendReply(5);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, minority_ack_with_majority_tas_failure_returns_tas_failure) {
+ ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks());
+ // Minority ACK, majority NACK
+ sendReply(3);
+ sendReply(4, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED);
+ sendReply(5, api::ReturnCode::TEST_AND_SET_CONDITION_FAILED);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(TEST_AND_SET_CONDITION_FAILED)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, minority_failure_override_not_in_effect_for_non_tas_errors) {
+ ASSERT_NO_FATAL_FAILURE(set_up_3_nodes_and_send_put_with_create_bucket_acks());
+ // Minority ACK, majority NACK. But non-TaS failure.
+ sendReply(3);
+ sendReply(4, api::ReturnCode::ABORTED);
+ sendReply(5);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(ABORTED)",
+ _sender.getLastReply());
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index d98a2ce6a74..6362db6c002 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -2,7 +2,6 @@
#include "persistencemessagetracker.h"
#include <vespa/storage/common/vectorprinter.h>
-#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/persistence.h>
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
@@ -24,6 +23,8 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
_manager(link),
_revertTimestamp(revertTimestamp),
_requestTimer(link.getClock()),
+ _n_persistence_replies_total(0),
+ _n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
_success(true)
{
@@ -204,9 +205,28 @@ PersistenceMessageTrackerImpl::shouldRevert() const
&& !_revertNodes.empty() && !_success && _reply;
}
+bool PersistenceMessageTrackerImpl::has_majority_successful_replies() const noexcept {
+ // FIXME this has questionable interaction with early client ACK since we only count
+ // the number of observed replies rather than the number of total requests sent.
+ // ... but the early ACK-feature dearly needs a redesign anyway.
+ return (_n_successful_persistence_replies >= (_n_persistence_replies_total / 2 + 1));
+}
+
+bool PersistenceMessageTrackerImpl::has_minority_test_and_set_failure() const noexcept {
+ return ((_reply->getResult().getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED)
+ && has_majority_successful_replies());
+}
+
void
PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
{
+ // If we've observed _partial_ TaS failures but have had a majority of good ACKs,
+ // treat the reply as successful. This is because the ACKed write(s) will eventually
+ // become visible across all nodes.
+ if (has_minority_test_and_set_failure()) {
+ _reply->setResult(api::ReturnCode());
+ }
+
updateMetrics();
_trace.setStrict(false);
if ( ! _trace.isEmpty()) {
@@ -245,11 +265,6 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply(
{
LOG(spam, "Create bucket reply failed, so deleting it from bucket db");
_manager.removeNodeFromDB(reply.getBucket(), node);
- LOG_BUCKET_OPERATION_NO_LOCK(
- reply.getBucketId(),
- vespalib::make_string(
- "Deleted bucket on node %u due to failing create bucket %s",
- node, reply.getResult().toString().c_str()));
}
}
@@ -258,12 +273,14 @@ PersistenceMessageTrackerImpl::handlePersistenceReply(
api::BucketInfoReply& reply,
uint16_t node)
{
+ ++_n_persistence_replies_total;
if (reply.getBucketInfo().valid()) {
addBucketInfoFromReply(node, reply);
}
if (reply.getResult().success()) {
logSuccessfulReply(node, reply);
_revertNodes.emplace_back(reply.getBucket(), node);
+ ++_n_successful_persistence_replies;
} else if (!hasSentReply()) {
updateFailureResult(reply);
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 21977ddd881..4392aa1fc30 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -12,8 +12,8 @@
namespace storage::distributor {
struct PersistenceMessageTracker {
- virtual ~PersistenceMessageTracker() { }
- typedef MessageTracker::ToSend ToSend;
+ virtual ~PersistenceMessageTracker() = default;
+ using ToSend = MessageTracker::ToSend;
virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
virtual void queueMessageBatch(const std::vector<ToSend>&) = 0;
@@ -29,7 +29,7 @@ class PersistenceMessageTrackerImpl : public PersistenceMessageTracker,
public MessageTracker
{
private:
- typedef std::map<document::Bucket, std::vector<BucketCopy> > BucketInfoMap;
+ using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
BucketInfoMap _remapBucketInfo;
BucketInfoMap _bucketInfo;
@@ -38,7 +38,7 @@ public:
std::shared_ptr<api::BucketInfoReply> reply,
DistributorComponent&,
api::Timestamp revertTimestamp = 0);
- ~PersistenceMessageTrackerImpl();
+ ~PersistenceMessageTrackerImpl() override;
void updateDB();
void updateMetrics();
@@ -52,7 +52,7 @@ public:
void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) override;
std::shared_ptr<api::BucketInfoReply>& getReply() override { return _reply; }
- typedef std::pair<document::Bucket, uint16_t> BucketNodePair;
+ using BucketNodePair = std::pair<document::Bucket, uint16_t>;
void revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes);
@@ -65,7 +65,7 @@ public:
void queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) override;
private:
- typedef std::vector<uint64_t> MessageBatch;
+ using MessageBatch = std::vector<uint64_t>;
std::vector<MessageBatch> _messageBatches;
PersistenceOperationMetricSet& _metric;
@@ -75,14 +75,18 @@ private:
std::vector<BucketNodePair> _revertNodes;
mbus::TraceNode _trace;
framework::MilliSecTimer _requestTimer;
+ uint32_t _n_persistence_replies_total;
+ uint32_t _n_successful_persistence_replies;
uint8_t _priority;
bool _success;
bool canSendReplyEarly() const;
void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply);
void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const;
- bool hasSentReply() const { return _reply.get() == 0; }
+ bool hasSentReply() const noexcept { return !_reply; }
bool shouldRevert() const;
+ bool has_majority_successful_replies() const noexcept;
+ bool has_minority_test_and_set_failure() const noexcept;
void sendReply(MessageSender& sender);
void updateFailureResult(const api::BucketInfoReply& reply);
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);