diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-12-13 09:54:07 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-12-13 10:04:48 +0000 |
commit | 16d71fbf178d22281ff2cc896c3c626d21ecb929 (patch) | |
tree | ec11e53608e032fc71cf344897275f85dba02886 /storage | |
parent | b0098c4d582e071dae1957d38ce4dc2b915e5184 (diff) |
Avoid fast past update restart race with concurrently created replica
After the recent change to allow safe path updates to be restarted
as fast path updates iff all observed document timestamps are equal,
a race condition regression was introduced. If the bucket that the
update operation was scheduled towards got a new replica concurrently
created _between_ the time that safe path Gets were sent and received,
it was possible for updates to be sent to inconsistent replicas. This
is because the Get and Update operations use the current database
state at _their_ start time, not a stable snapshot state from the start
time of the two phase update operation itself.
Add an explicit check that the replica state between sending Gets and
Updates is unchanged. If it has changed, a fast path restart is _not_
permitted.
Diffstat (limited to 'storage')
5 files changed, 58 insertions, 3 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 788ac1960dd..619c3151295 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -835,7 +835,10 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge } TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) { - do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart + // TODO find a way to test this case properly again since this test now triggers + // the "replica set has changed" check and does not actually restart with a fast + // update path. + do_test_ownership_changed_between_gets_and_second_phase(70, 70, 70); // Timestamps in sync -> Update restart } TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) { @@ -1044,6 +1047,31 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do EXPECT_EQ(0, metrics.fast_path_restarts.getValue()); } +TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_altered_between_get_send_and_receive) { + setupDistributor(3, 3, "storage:3 distributor:1"); + getConfig().set_update_fast_path_restart_enabled(true); + + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + // Replica set changes between time of Get requests sent and + // responses received. This may happen e.g. if concurrent mutations + // to the same bucket create a new replica. If this happens, we + // must not send the Update operations verbatim, as they will + // be started with the _current_ replica set, not the one that + // was present during the Get request. + BucketId bucket(0x400000000000cac4); // Always the same in the test. + addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); + + Timestamp old_timestamp = 500; + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + replyToGet(*cb, sender, 0, old_timestamp); + replyToGet(*cb, sender, 1, old_timestamp); + + ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2)); +} + // XXX currently differs in behavior from content nodes in that updates for // document IDs without explicit doctypes will _not_ be auto-failed on the // distributor. diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 7d7cd37e848..a0177db3d3c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -243,6 +243,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard for (uint32_t i = 0; i < e->getNodeCount(); i++) { const BucketCopy& copy = e->getNodeRef(i); + // TODO this could ideally be a set + _replicas_in_db.emplace_back(e.getBucketId(), copy.getNode()); + if (!copy.valid()) { _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy); } else if (!copy.empty()) { diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index a30a9a721bd..0b3027b6a9c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -40,6 +40,10 @@ public: // Exposed for unit testing. TODO feels a bit dirty :I const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; } + const std::vector<std::pair<document::BucketId, uint16_t>>& replicas_in_db() const noexcept { + return _replicas_in_db; + } + private: class GroupId { public: @@ -88,6 +92,7 @@ private: PersistenceOperationMetricSet& _metric; framework::MilliSecTimer _operationTimer; + std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db; bool _has_replica_inconsistency; void sendReply(DistributorMessageSender& sender); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 4283386a1bc..a209c54cc47 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -41,7 +41,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( _updateDocBucketId = idFactory.getBucketId(_updateCmd->getDocumentId()); } -TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() {} +TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() = default; namespace { @@ -183,6 +183,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender) _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric); GetOperation & op = *getOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender); + _replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time op.start(intermediate, _manager.getClock().getTimeInMillis()); transitionTo(SendState::GETS_SENT); @@ -408,7 +409,23 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) { return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() && reply.wasFound() && - reply.had_consistent_replicas()); + reply.had_consistent_replicas() && + replica_set_unchanged_after_get_operation()); +} + +bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const { + std::vector<BucketDatabase::Entry> entries; + _bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries); + + std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now; + for (uint32_t j = 0; j < entries.size(); ++j) { + const auto& e = entries[j]; + for (uint32_t i = 0; i < e->getNodeCount(); i++) { + const auto& copy = e->getNodeRef(i); + replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode()); + } + } + return (replicas_in_db_now == _replicas_at_get_send_time); } void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 84bcf2beff8..44c463666fd 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -122,6 +122,7 @@ private: void replyWithTasFailure(DistributorMessageSender& sender, vespalib::stringref message); bool may_restart_with_fast_path(const api::GetReply& reply); + bool replica_set_unchanged_after_get_operation() const; void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender); UpdateMetricSet& _updateMetric; @@ -136,6 +137,7 @@ private: Mode _mode; mbus::TraceNode _trace; document::BucketId _updateDocBucketId; + std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time; bool _replySent; }; |