summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2020-01-14 19:41:24 +0100
committerGitHub <noreply@github.com>2020-01-14 19:41:24 +0100
commite48d8cedaf2985e1a71514edd12039f4dc394042 (patch)
tree4249e1489130066a2b8bf41dfa32e41dc14b3da0
parente9f37e78fac9a3ac457db3bd384decfcee3dd4af (diff)
parentf5fb42b6b5d95e2b57f78f0dfd97adc7fee0ff68 (diff)
Merge pull request #11761 from vespa-engine/vekterli/avoid-inconsistent-auto-created-document-versions-taking-precedence
Avoid inconsistent auto-created document versions taking precedence
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp70
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp48
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h2
5 files changed, 105 insertions, 21 deletions
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index 1d6fb5fe2ea..844cf80e6b6 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -14,8 +14,6 @@
#include <vespa/vespalib/gtest/gtest.h>
using namespace document;
-using namespace storage;
-using namespace storage::distributor;
using namespace storage::api;
using namespace std;
using namespace storage::lib;
@@ -25,6 +23,8 @@ using config::FileSpec;
using vespalib::string;
using document::test::makeDocumentBucket;
+namespace storage::distributor {
+
struct UpdateOperationTest : Test, DistributorTestUtil {
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _html_type;
@@ -46,17 +46,18 @@ struct UpdateOperationTest : Test, DistributorTestUtil {
const api::ReturnCode& result = api::ReturnCode());
std::shared_ptr<UpdateOperation>
- sendUpdate(const std::string& bucketState);
+ sendUpdate(const std::string& bucketState, bool create_if_missing = false);
document::BucketId _bId;
};
std::shared_ptr<UpdateOperation>
-UpdateOperationTest::sendUpdate(const std::string& bucketState)
+UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing)
{
auto update = std::make_shared<document::DocumentUpdate>(
*_repo, *_html_type,
document::DocumentId("id:ns:" + _html_type->getName() + "::1"));
+ update->setCreateIfNonExistent(create_if_missing);
_bId = getExternalOperationHandler().getBucketId(update->getId());
@@ -70,7 +71,6 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState)
getDistributor().getMetrics().updates[msg->getLoadType()]);
}
-
void
UpdateOperationTest::replyToMessage(UpdateOperation& callback, DistributorMessageSenderStub& sender, uint32_t index,
uint64_t oldTimestamp, const api::BucketInfo& info, const api::ReturnCode& result)
@@ -183,3 +183,63 @@ TEST_F(UpdateOperationTest, test_and_set_failures_increment_tas_metric) {
EXPECT_EQ(1, metrics.failures.test_and_set_failed.getValue());
}
+// Create-if-missing updates have a rather finicky behavior in the backend, wherein they'll
+// set the timestamp of the previous document to that of the _new_ document timestamp if
+// the update ended up creating a document from scratch. This particular behavior confuses
+// the "after the fact" timestamp consistency checks, since it will seem like the document
+// that was created from scratch is a better candidate to force convergence towards rather
+// than the ones that actually updated an existing document.
+// We therefore detect this case specially and treat the received timestamps as if the
+// document updated had a timestamp of zero.
+// An alternative approach to this is to change the backend behavior by sending timestamps
+// of zero in this case, but this would cause complications during rolling upgrades that would
+// need explicit workaround logic anyway.
+TEST_F(UpdateOperationTest, create_if_missing_update_sentinel_timestamp_is_treated_as_zero_timestamp) {
+ setupDistributor(2, 2, "distributor:1 storage:2");
+ std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3", true));
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+
+ // For these tests, it's deterministic that the newly assigned timestamp
+ // is 100. Reply that we updated this timestamp on all nodes, implying
+ // that the document was auto-created.
+ replyToMessage(*cb, sender, 0, 100);
+ replyToMessage(*cb, sender, 1, 100);
+
+ ASSERT_EQ("UpdateReply(id:ns:text/html::1, BucketId(0x0000000000000000), "
+ "timestamp 100, timestamp of updated doc: 0) ReturnCode(NONE)",
+ sender.getLastReply(true));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(0, metrics.diverging_timestamp_updates.getValue());
+}
+
+TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest_non_auto_created_replica) {
+ setupDistributor(2, 3, "distributor:1 storage:3");
+ std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3", true));
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true));
+ replyToMessage(*cb, sender, 0, 100); // Newly created
+ replyToMessage(*cb, sender, 2, 80); // Too old and dusty; should not be picked.
+ replyToMessage(*cb, sender, 1, 90); // Should be picked
+
+ ASSERT_EQ("UpdateReply(id:ns:text/html::1, BucketId(0x0000000000000000), "
+ "timestamp 100, timestamp of updated doc: 90 Was inconsistent "
+ "(best node 1)) ReturnCode(NONE)",
+ sender.getLastReply(true));
+
+ auto newest = cb->getNewestTimestampLocation();
+ EXPECT_NE(newest.first, BucketId());
+ EXPECT_EQ(newest.second, 1);
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ // Implementation detail: since we get diverging results from nodes 2 and 1, these are
+ // counted as separate diverging updates.
+ EXPECT_EQ(2, metrics.diverging_timestamp_updates.getValue());
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 771bd90b247..3cf311936ea 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -2,6 +2,7 @@
#include "updateoperation.h"
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/distributormetricsset.h>
@@ -12,11 +13,10 @@
LOG_SETUP(".distributor.callback.doc.update");
-
-using namespace storage::distributor;
-using namespace storage;
using document::BucketSpace;
+namespace storage::distributor {
+
UpdateOperation::UpdateOperation(DistributorComponent& manager,
DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::UpdateCommand> & msg,
@@ -26,6 +26,8 @@ UpdateOperation::UpdateOperation(DistributorComponent& manager,
manager, msg->getTimestamp()),
_tracker(_trackerInstance),
_msg(msg),
+ _new_timestamp(_msg->getTimestamp()),
+ _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()),
_manager(manager),
_bucketSpace(bucketSpace),
_newestTimestampLocation(),
@@ -83,6 +85,7 @@ UpdateOperation::onStart(DistributorMessageSender& sender)
"No buckets found for given document update"));
return;
}
+
// An UpdateOperation should only be started iff all replicas are consistent
// with each other, so sampling a single replica should be equal to sampling them all.
assert(entries[0].getBucketInfo().getNodeCount() > 0); // Empty buckets are not allowed
@@ -92,22 +95,22 @@ UpdateOperation::onStart(DistributorMessageSender& sender)
// bucket sub-tree, but there is nothing here at all which will fail the
// update if we cannot satisfy a desired replication level (not even for
// n-of-m operations).
- for (uint32_t j = 0; j < entries.size(); ++j) {
- LOG(spam, "Found bucket %s", entries[j].toString().c_str());
+ for (const auto& entry : entries) {
+ LOG(spam, "Found bucket %s", entry.toString().c_str());
- const std::vector<uint16_t>& nodes = entries[j]->getNodes();
+ const std::vector<uint16_t>& nodes = entry->getNodes();
std::vector<MessageTracker::ToSend> messages;
- for (uint32_t i = 0; i < nodes.size(); i++) {
- std::shared_ptr<api::UpdateCommand> command(
- new api::UpdateCommand(document::Bucket(_msg->getBucket().getBucketSpace(), entries[j].getBucketId()),
- _msg->getUpdate(),
- _msg->getTimestamp()));
+ for (uint16_t node : nodes) {
+ auto command = std::make_shared<api::UpdateCommand>(
+ document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()),
+ _msg->getUpdate(),
+ _msg->getTimestamp());
copyMessageSettings(*_msg, *command);
command->setOldTimestamp(_msg->getOldTimestamp());
command->setCondition(_msg->getCondition());
- messages.push_back(MessageTracker::ToSend(command, nodes[i]));
+ messages.emplace_back(std::move(command), node);
}
_tracker.queueMessageBatch(messages);
@@ -128,7 +131,8 @@ UpdateOperation::onReceive(DistributorMessageSender& sender,
if (node != (uint16_t)-1) {
if (reply.getResult().getResult() == api::ReturnCode::OK) {
- _results.emplace_back(reply.getBucketId(), reply.getBucketInfo(), reply.getOldTimestamp(), node);
+ _results.emplace_back(reply.getBucketId(), reply.getBucketInfo(),
+ adjusted_received_old_timestamp(reply.getOldTimestamp()), node);
}
if (_tracker.getReply().get()) {
@@ -179,9 +183,25 @@ UpdateOperation::onReceive(DistributorMessageSender& sender,
}
}
-
void
UpdateOperation::onClose(DistributorMessageSender& sender)
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+
+// The backend behavior of "create-if-missing" updates is to return the timestamp of the
+// _new_ update operation if the document was created from scratch. The two-phase update
+// operation logic auto-detects unexpected inconsistencies and tries to reconcile
+// replicas by forcing document versions to that assumed most likely to preserve the history
+// of the document. Normally this is the highest updated timestamp, so to avoid newly created
+// replicas from overwriting updates that actually updated existing document versions, treat
+// a received timestamp == new timestamp as if it were actually a timestamp of zero.
+// This mirrors the received timestamp for regular updates that do not find a matching document.
+api::Timestamp UpdateOperation::adjusted_received_old_timestamp(api::Timestamp old_ts_from_node) const {
+ if (!_is_auto_create_update) {
+ return old_ts_from_node;
+ }
+ return (old_ts_from_node != _new_timestamp) ? old_ts_from_node : api::Timestamp(0);
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 91491485056..c66026f02f5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -43,6 +43,8 @@ private:
PersistenceMessageTrackerImpl _trackerInstance;
PersistenceMessageTracker& _tracker;
std::shared_ptr<api::UpdateCommand> _msg;
+ const api::Timestamp _new_timestamp;
+ const bool _is_auto_create_update;
DistributorComponent& _manager;
DistributorBucketSpace &_bucketSpace;
@@ -64,6 +66,8 @@ private:
std::vector<PreviousDocumentVersion> _results;
UpdateMetricSet& _metrics;
+
+ api::Timestamp adjusted_received_old_timestamp(api::Timestamp old_ts_from_node) const;
};
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index 1519a9183ba..060eda2ffc5 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -85,7 +85,7 @@ PersistenceMessageTrackerImpl::receiveReply(
void
PersistenceMessageTrackerImpl::revert(
MessageSender& sender,
- const std::vector<BucketNodePair> revertNodes)
+ const std::vector<BucketNodePair>& revertNodes)
{
if (_revertTimestamp != 0) {
// Since we're reverting, all received bucket info is voided.
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 96200342e08..2b4e7d8edc1 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -54,7 +54,7 @@ public:
typedef std::pair<document::Bucket, uint16_t> BucketNodePair;
- void revert(MessageSender& sender, const std::vector<BucketNodePair> revertNodes);
+ void revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes);
/**
Sends a set of messages that are permissible for early return.