aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-12-13 09:54:07 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-12-13 10:04:48 +0000
commit16d71fbf178d22281ff2cc896c3c626d21ecb929 (patch)
treeec11e53608e032fc71cf344897275f85dba02886 /storage
parentb0098c4d582e071dae1957d38ce4dc2b915e5184 (diff)
Avoid fast past update restart race with concurrently created replica
After the recent change to allow safe path updates to be restarted as fast path updates iff all observed document timestamps are equal, a race condition regression was introduced. If the bucket that the update operation was scheduled towards got a new replica concurrently created _between_ the time that safe path Gets were sent and received, it was possible for updates to be sent to inconsistent replicas. This is because the Get and Update operations use the current database state at _their_ start time, not a stable snapshot state from the start time of the two phase update operation itself. Add an explicit check that the replica state between sending Gets and Updates is unchanged. If it has changed, a fast path restart is _not_ permitted.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp30
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h2
5 files changed, 58 insertions, 3 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 788ac1960dd..619c3151295 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -835,7 +835,10 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) {
- do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart
+ // TODO find a way to test this case properly again since this test now triggers
+ // the "replica set has changed" check and does not actually restart with a fast
+ // update path.
+ do_test_ownership_changed_between_gets_and_second_phase(70, 70, 70); // Timestamps in sync -> Update restart
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
@@ -1044,6 +1047,31 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
}
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_altered_between_get_send_and_receive) {
+ setupDistributor(3, 3, "storage:3 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(true);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ // Replica set changes between time of Get requests sent and
+ // responses received. This may happen e.g. if concurrent mutations
+ // to the same bucket create a new replica. If this happens, we
+ // must not send the Update operations verbatim, as they will
+ // be started with the _current_ replica set, not the one that
+ // was present during the Get request.
+ BucketId bucket(0x400000000000cac4); // Always the same in the test.
+ addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ replyToGet(*cb, sender, 0, old_timestamp);
+ replyToGet(*cb, sender, 1, old_timestamp);
+
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 7d7cd37e848..a0177db3d3c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -243,6 +243,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
const BucketCopy& copy = e->getNodeRef(i);
+ // TODO this could ideally be a set
+ _replicas_in_db.emplace_back(e.getBucketId(), copy.getNode());
+
if (!copy.valid()) {
_responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy);
} else if (!copy.empty()) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index a30a9a721bd..0b3027b6a9c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -40,6 +40,10 @@ public:
// Exposed for unit testing. TODO feels a bit dirty :I
const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
+ const std::vector<std::pair<document::BucketId, uint16_t>>& replicas_in_db() const noexcept {
+ return _replicas_in_db;
+ }
+
private:
class GroupId {
public:
@@ -88,6 +92,7 @@ private:
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
+ std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
bool _has_replica_inconsistency;
void sendReply(DistributorMessageSender& sender);
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 4283386a1bc..a209c54cc47 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -41,7 +41,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_updateDocBucketId = idFactory.getBucketId(_updateCmd->getDocumentId());
}
-TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() {}
+TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() = default;
namespace {
@@ -183,6 +183,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
_manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric);
GetOperation & op = *getOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender);
+ _replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time
op.start(intermediate, _manager.getClock().getTimeInMillis());
transitionTo(SendState::GETS_SENT);
@@ -408,7 +409,23 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) {
return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() &&
reply.wasFound() &&
- reply.had_consistent_replicas());
+ reply.had_consistent_replicas() &&
+ replica_set_unchanged_after_get_operation());
+}
+
+bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const {
+ std::vector<BucketDatabase::Entry> entries;
+ _bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries);
+
+ std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now;
+ for (uint32_t j = 0; j < entries.size(); ++j) {
+ const auto& e = entries[j];
+ for (uint32_t i = 0; i < e->getNodeCount(); i++) {
+ const auto& copy = e->getNodeRef(i);
+ replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode());
+ }
+ }
+ return (replicas_in_db_now == _replicas_at_get_send_time);
}
void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 84bcf2beff8..44c463666fd 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -122,6 +122,7 @@ private:
void replyWithTasFailure(DistributorMessageSender& sender,
vespalib::stringref message);
bool may_restart_with_fast_path(const api::GetReply& reply);
+ bool replica_set_unchanged_after_get_operation() const;
void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender);
UpdateMetricSet& _updateMetric;
@@ -136,6 +137,7 @@ private:
Mode _mode;
mbus::TraceNode _trace;
document::BucketId _updateDocBucketId;
+ std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time;
bool _replySent;
};