diff options
Diffstat (limited to 'storage')
5 files changed, 105 insertions, 21 deletions
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 1d6fb5fe2ea..844cf80e6b6 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -14,8 +14,6 @@ #include <vespa/vespalib/gtest/gtest.h> using namespace document; -using namespace storage; -using namespace storage::distributor; using namespace storage::api; using namespace std; using namespace storage::lib; @@ -25,6 +23,8 @@ using config::FileSpec; using vespalib::string; using document::test::makeDocumentBucket; +namespace storage::distributor { + struct UpdateOperationTest : Test, DistributorTestUtil { std::shared_ptr<const DocumentTypeRepo> _repo; const DocumentType* _html_type; @@ -46,17 +46,18 @@ struct UpdateOperationTest : Test, DistributorTestUtil { const api::ReturnCode& result = api::ReturnCode()); std::shared_ptr<UpdateOperation> - sendUpdate(const std::string& bucketState); + sendUpdate(const std::string& bucketState, bool create_if_missing = false); document::BucketId _bId; }; std::shared_ptr<UpdateOperation> -UpdateOperationTest::sendUpdate(const std::string& bucketState) +UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing) { auto update = std::make_shared<document::DocumentUpdate>( *_repo, *_html_type, document::DocumentId("id:ns:" + _html_type->getName() + "::1")); + update->setCreateIfNonExistent(create_if_missing); _bId = getExternalOperationHandler().getBucketId(update->getId()); @@ -70,7 +71,6 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState) getDistributor().getMetrics().updates[msg->getLoadType()]); } - void UpdateOperationTest::replyToMessage(UpdateOperation& callback, DistributorMessageSenderStub& sender, uint32_t index, uint64_t oldTimestamp, const api::BucketInfo& info, const api::ReturnCode& result) @@ -183,3 +183,63 @@ TEST_F(UpdateOperationTest, test_and_set_failures_increment_tas_metric) { EXPECT_EQ(1, metrics.failures.test_and_set_failed.getValue()); } +// Create-if-missing updates have a rather finicky behavior in the backend, wherein they'll +// set the timestamp of the previous document to that of the _new_ document timestamp if +// the update ended up creating a document from scratch. This particular behavior confuses +// the "after the fact" timestamp consistency checks, since it will seem like the document +// that was created from scratch is a better candidate to force convergence towards rather +// than the ones that actually updated an existing document. +// We therefore detect this case specially and treat the received timestamps as if the +// document updated had a timestamp of zero. +// An alternative approach to this is to change the backend behavior by sending timestamps +// of zero in this case, but this would cause complications during rolling upgrades that would +// need explicit workaround logic anyway. +TEST_F(UpdateOperationTest, create_if_missing_update_sentinel_timestamp_is_treated_as_zero_timestamp) { + setupDistributor(2, 2, "distributor:1 storage:2"); + std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3", true)); + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); + + // For these tests, it's deterministic that the newly assigned timestamp + // is 100. Reply that we updated this timestamp on all nodes, implying + // that the document was auto-created. + replyToMessage(*cb, sender, 0, 100); + replyToMessage(*cb, sender, 1, 100); + + ASSERT_EQ("UpdateReply(id:ns:text/html::1, BucketId(0x0000000000000000), " + "timestamp 100, timestamp of updated doc: 0) ReturnCode(NONE)", + sender.getLastReply(true)); + + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + EXPECT_EQ(0, metrics.diverging_timestamp_updates.getValue()); +} + +TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest_non_auto_created_replica) { + setupDistributor(2, 3, "distributor:1 storage:3"); + std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3", true)); + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true)); + replyToMessage(*cb, sender, 0, 100); // Newly created + replyToMessage(*cb, sender, 2, 80); // Too old and dusty; should not be picked. + replyToMessage(*cb, sender, 1, 90); // Should be picked + + ASSERT_EQ("UpdateReply(id:ns:text/html::1, BucketId(0x0000000000000000), " + "timestamp 100, timestamp of updated doc: 90 Was inconsistent " + "(best node 1)) ReturnCode(NONE)", + sender.getLastReply(true)); + + auto newest = cb->getNewestTimestampLocation(); + EXPECT_NE(newest.first, BucketId()); + EXPECT_EQ(newest.second, 1); + + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + // Implementation detail: since we get diverging results from nodes 2 and 1, these are + // counted as separate diverging updates. + EXPECT_EQ(2, metrics.diverging_timestamp_updates.getValue()); +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 771bd90b247..3cf311936ea 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -2,6 +2,7 @@ #include "updateoperation.h" #include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/distributormetricsset.h> @@ -12,11 +13,10 @@ LOG_SETUP(".distributor.callback.doc.update"); - -using namespace storage::distributor; -using namespace storage; using document::BucketSpace; +namespace storage::distributor { + UpdateOperation::UpdateOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, @@ -26,6 +26,8 @@ UpdateOperation::UpdateOperation(DistributorComponent& manager, manager, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), + _new_timestamp(_msg->getTimestamp()), + _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), _manager(manager), _bucketSpace(bucketSpace), _newestTimestampLocation(), @@ -83,6 +85,7 @@ UpdateOperation::onStart(DistributorMessageSender& sender) "No buckets found for given document update")); return; } + // An UpdateOperation should only be started iff all replicas are consistent // with each other, so sampling a single replica should be equal to sampling them all. assert(entries[0].getBucketInfo().getNodeCount() > 0); // Empty buckets are not allowed @@ -92,22 +95,22 @@ UpdateOperation::onStart(DistributorMessageSender& sender) // bucket sub-tree, but there is nothing here at all which will fail the // update if we cannot satisfy a desired replication level (not even for // n-of-m operations). - for (uint32_t j = 0; j < entries.size(); ++j) { - LOG(spam, "Found bucket %s", entries[j].toString().c_str()); + for (const auto& entry : entries) { + LOG(spam, "Found bucket %s", entry.toString().c_str()); - const std::vector<uint16_t>& nodes = entries[j]->getNodes(); + const std::vector<uint16_t>& nodes = entry->getNodes(); std::vector<MessageTracker::ToSend> messages; - for (uint32_t i = 0; i < nodes.size(); i++) { - std::shared_ptr<api::UpdateCommand> command( - new api::UpdateCommand(document::Bucket(_msg->getBucket().getBucketSpace(), entries[j].getBucketId()), - _msg->getUpdate(), - _msg->getTimestamp())); + for (uint16_t node : nodes) { + auto command = std::make_shared<api::UpdateCommand>( + document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()), + _msg->getUpdate(), + _msg->getTimestamp()); copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); - messages.push_back(MessageTracker::ToSend(command, nodes[i])); + messages.emplace_back(std::move(command), node); } _tracker.queueMessageBatch(messages); @@ -128,7 +131,8 @@ UpdateOperation::onReceive(DistributorMessageSender& sender, if (node != (uint16_t)-1) { if (reply.getResult().getResult() == api::ReturnCode::OK) { - _results.emplace_back(reply.getBucketId(), reply.getBucketInfo(), reply.getOldTimestamp(), node); + _results.emplace_back(reply.getBucketId(), reply.getBucketInfo(), + adjusted_received_old_timestamp(reply.getOldTimestamp()), node); } if (_tracker.getReply().get()) { @@ -179,9 +183,25 @@ UpdateOperation::onReceive(DistributorMessageSender& sender, } } - void UpdateOperation::onClose(DistributorMessageSender& sender) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } + +// The backend behavior of "create-if-missing" updates is to return the timestamp of the +// _new_ update operation if the document was created from scratch. The two-phase update +// operation logic auto-detects unexpected inconsistencies and tries to reconcile +// replicas by forcing document versions to that assumed most likely to preserve the history +// of the document. Normally this is the highest updated timestamp, so to avoid newly created +// replicas from overwriting updates that actually updated existing document versions, treat +// a received timestamp == new timestamp as if it were actually a timestamp of zero. +// This mirrors the received timestamp for regular updates that do not find a matching document. +api::Timestamp UpdateOperation::adjusted_received_old_timestamp(api::Timestamp old_ts_from_node) const { + if (!_is_auto_create_update) { + return old_ts_from_node; + } + return (old_ts_from_node != _new_timestamp) ? old_ts_from_node : api::Timestamp(0); +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 91491485056..c66026f02f5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -43,6 +43,8 @@ private: PersistenceMessageTrackerImpl _trackerInstance; PersistenceMessageTracker& _tracker; std::shared_ptr<api::UpdateCommand> _msg; + const api::Timestamp _new_timestamp; + const bool _is_auto_create_update; DistributorComponent& _manager; DistributorBucketSpace &_bucketSpace; @@ -64,6 +66,8 @@ private: std::vector<PreviousDocumentVersion> _results; UpdateMetricSet& _metrics; + + api::Timestamp adjusted_received_old_timestamp(api::Timestamp old_ts_from_node) const; }; } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index 1519a9183ba..060eda2ffc5 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -85,7 +85,7 @@ PersistenceMessageTrackerImpl::receiveReply( void PersistenceMessageTrackerImpl::revert( MessageSender& sender, - const std::vector<BucketNodePair> revertNodes) + const std::vector<BucketNodePair>& revertNodes) { if (_revertTimestamp != 0) { // Since we're reverting, all received bucket info is voided. diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index 96200342e08..2b4e7d8edc1 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -54,7 +54,7 @@ public: typedef std::pair<document::Bucket, uint16_t> BucketNodePair; - void revert(MessageSender& sender, const std::vector<BucketNodePair> revertNodes); + void revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes); /** Sends a set of messages that are permissible for early return. |