summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-04-25 15:23:27 +0200
committerGitHub <noreply@github.com>2017-04-25 15:23:27 +0200
commit1db9d2d06e8b7ea59b48ffe2f87f1e176c807806 (patch)
tree470fbf2b1261b510b4f68ce66e82828aa65355d8 /storage
parentdf1e3f6677466625e5d11046163ec5060ec2b9b9 (diff)
Add operation sequencing for put, remove and update operations (#2252)
Keeps track of GIDs of pending operations and bounces operations that arrive to these GIDs before the original operation has completed. RemoveLocation operations are currently not handled due to these covering an a priori unknown set of GIDs, but sequencer can be extended to support these as well with some extra work. Enabled by default, but may be disabled via config.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/distributortest.cpp28
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp229
-rw-r--r--storage/src/tests/distributor/operation_sequencer_test.cpp62
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def6
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp56
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h6
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/operation_sequencer.h85
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/sequenced_operation.h25
19 files changed, 543 insertions, 47 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 2676a31e832..4d053fc9fa1 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -40,6 +40,7 @@ vespa_add_library(storage_testdistributor TEST
distributor_host_info_reporter_test.cpp
ownership_transfer_safe_time_point_calculator_test.cpp
persistence_metrics_set_test.cpp
+ operation_sequencer_test.cpp
DEPENDS
storage_distributor
storage_testcommon
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 10b0d035770..298a4283bfa 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -44,6 +44,7 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(bucketActivationConfigIsPropagatedToDistributorConfiguration);
CPPUNIT_TEST(max_clock_skew_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(configured_safe_time_point_rejection_works_end_to_end);
+ CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -70,6 +71,7 @@ protected:
void bucketActivationConfigIsPropagatedToDistributorConfiguration();
void max_clock_skew_config_is_propagated_to_distributor_config();
void configured_safe_time_point_rejection_works_end_to_end();
+ void sequencing_config_is_propagated_to_distributor_config();
public:
void setUp() override {
@@ -173,6 +175,7 @@ private:
void sendDownDummyRemoveCommand();
void assertSingleBouncedRemoveReplyPresent();
void assertNoMessageBounced();
+ void configure_mutation_sequencing(bool enabled);
};
CPPUNIT_TEST_SUITE_REGISTRATION(Distributor_Test);
@@ -790,6 +793,31 @@ Distributor_Test::configured_safe_time_point_rejection_works_end_to_end() {
assertNoMessageBounced();
}
+void Distributor_Test::configure_mutation_sequencing(bool enabled) {
+ using namespace vespa::config::content::core;
+ using ConfigBuilder = StorDistributormanagerConfigBuilder;
+
+ ConfigBuilder builder;
+ builder.sequenceMutatingOperations = enabled;
+ getConfig().configure(builder);
+ _distributor->enableNextConfig();
+}
+
+void Distributor_Test::sequencing_config_is_propagated_to_distributor_config() {
+ setupDistributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+
+ // Should be enabled by default
+ CPPUNIT_ASSERT(getConfig().getSequenceMutatingOperations());
+
+ // Explicitly disabled.
+ configure_mutation_sequencing(false);
+ CPPUNIT_ASSERT(!getConfig().getSequenceMutatingOperations());
+
+ // Explicitly enabled.
+ configure_mutation_sequencing(true);
+ CPPUNIT_ASSERT(getConfig().getSequenceMutatingOperations());
+}
+
}
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index c63db0c01e6..d9423c03a41 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -2,6 +2,7 @@
#include <tests/distributor/distributortestutil.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/distributor.h>
@@ -22,6 +23,16 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST(reject_update_if_not_past_safe_time_point);
CPPUNIT_TEST(get_not_rejected_by_unsafe_time_point);
CPPUNIT_TEST(mutation_not_rejected_when_safe_point_reached);
+ CPPUNIT_TEST(reject_put_with_concurrent_mutation_to_same_id);
+ CPPUNIT_TEST(do_not_reject_put_operations_to_different_ids);
+ CPPUNIT_TEST(reject_remove_with_concurrent_mutation_to_same_id);
+ CPPUNIT_TEST(do_not_reject_remove_operations_to_different_ids);
+ CPPUNIT_TEST(reject_update_with_concurrent_mutation_to_same_id);
+ CPPUNIT_TEST(do_not_reject_update_operations_to_different_ids);
+ CPPUNIT_TEST(operation_destruction_allows_new_mutations_for_id);
+ CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict);
+ CPPUNIT_TEST(sequencing_works_across_mutation_types);
+ CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled);
CPPUNIT_TEST_SUITE_END();
document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state);
@@ -29,14 +40,28 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
vespalib::stringref state1,
vespalib::stringref state2);
- std::shared_ptr<api::GetCommand> makeGetCommandForUser(uint64_t id);
- std::shared_ptr<api::UpdateCommand> makeUpdateCommand();
+ std::shared_ptr<api::GetCommand> makeGetCommandForUser(uint64_t id) const;
+ std::shared_ptr<api::GetCommand> makeGetCommand(const vespalib::string& id) const;
+ std::shared_ptr<api::UpdateCommand> makeUpdateCommand(const vespalib::string& doc_type,
+ const vespalib::string& id) const;
+ std::shared_ptr<api::UpdateCommand> makeUpdateCommand() const;
+ std::shared_ptr<api::PutCommand> makePutCommand(const vespalib::string& doc_type,
+ const vespalib::string& id) const;
+ std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const;
+
+ Operation::SP start_operation_verify_not_rejected(std::shared_ptr<api::StorageCommand> cmd);
+ void start_operation_verify_rejected(std::shared_ptr<api::StorageCommand> cmd);
int64_t safe_time_not_reached_metric_count(
const metrics::LoadMetric<PersistenceOperationMetricSet>& metrics) const {
return metrics[documentapi::LoadType::DEFAULT].failures
.safe_time_not_reached.getLongValue("count");
}
+
+ void set_up_distributor_for_sequencing_test();
+
+ const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"};
+
protected:
void testBucketSplitMask();
void testOperationRejectedOnWrongDistribution();
@@ -46,10 +71,28 @@ protected:
void reject_update_if_not_past_safe_time_point();
void get_not_rejected_by_unsafe_time_point();
void mutation_not_rejected_when_safe_point_reached();
+ void reject_put_with_concurrent_mutation_to_same_id();
+ void do_not_reject_put_operations_to_different_ids();
+ void reject_remove_with_concurrent_mutation_to_same_id();
+ void do_not_reject_remove_operations_to_different_ids();
+ void reject_update_with_concurrent_mutation_to_same_id();
+ void do_not_reject_update_operations_to_different_ids();
+ void operation_destruction_allows_new_mutations_for_id();
+ void concurrent_get_and_mutation_do_not_conflict();
+ void sequencing_works_across_mutation_types();
+ void sequencing_can_be_explicitly_config_disabled();
void assert_rejection_due_to_unsafe_time(
std::shared_ptr<api::StorageCommand> cmd);
+ void assert_second_command_rejected_due_to_concurrent_mutation(
+ std::shared_ptr<api::StorageCommand> cmd1,
+ std::shared_ptr<api::StorageCommand> cmd2,
+ const vespalib::string& expected_id_in_message);
+ void assert_second_command_not_rejected_due_to_concurrent_mutation(
+ std::shared_ptr<api::StorageCommand> cmd1,
+ std::shared_ptr<api::StorageCommand> cmd2);
+
public:
void tearDown() override {
close();
@@ -59,6 +102,8 @@ public:
CPPUNIT_TEST_SUITE_REGISTRATION(ExternalOperationHandlerTest);
+using document::DocumentId;
+
void
ExternalOperationHandlerTest::testBucketSplitMask()
{
@@ -131,23 +176,43 @@ ExternalOperationHandlerTest::findOwned1stNotOwned2ndInStates(
}
std::shared_ptr<api::GetCommand>
-ExternalOperationHandlerTest::makeGetCommandForUser(uint64_t id)
-{
- document::DocumentId docId(document::UserDocIdString("userdoc:foo:" + vespalib::make_string("%lu", id) + ":bar"));
- return std::make_shared<api::GetCommand>(
- document::BucketId(0), docId, "[all]");
+ExternalOperationHandlerTest::makeGetCommand(const vespalib::string& id) const {
+ return std::make_shared<api::GetCommand>(document::BucketId(0), DocumentId(id), "[all]");
}
-std::shared_ptr<api::UpdateCommand>
-ExternalOperationHandlerTest::makeUpdateCommand()
-{
+std::shared_ptr<api::GetCommand>
+ExternalOperationHandlerTest::makeGetCommandForUser(uint64_t id) const {
+ DocumentId docId(document::UserDocIdString(vespalib::make_string("userdoc:foo:%lu:bar", id)));
+ return std::make_shared<api::GetCommand>(document::BucketId(0), docId, "[all]");
+}
+
+std::shared_ptr<api::UpdateCommand> ExternalOperationHandlerTest::makeUpdateCommand(
+ const vespalib::string& doc_type,
+ const vespalib::string& id) const {
auto update = std::make_shared<document::DocumentUpdate>(
- *_testDocMan.getTypeRepo().getDocumentType("testdoctype1"),
- document::DocumentId("id:foo:testdoctype1::baz"));
+ *_testDocMan.getTypeRepo().getDocumentType(doc_type),
+ document::DocumentId(id));
return std::make_shared<api::UpdateCommand>(
document::BucketId(0), std::move(update), api::Timestamp(0));
}
+std::shared_ptr<api::UpdateCommand>
+ExternalOperationHandlerTest::makeUpdateCommand() const {
+ return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz");
+}
+
+std::shared_ptr<api::PutCommand> ExternalOperationHandlerTest::makePutCommand(
+ const vespalib::string& doc_type,
+ const vespalib::string& id) const {
+ auto doc = _testDocMan.createDocument(doc_type, id);
+ return std::make_shared<api::PutCommand>(
+ document::BucketId(0), std::move(doc), api::Timestamp(0));
+}
+
+std::shared_ptr<api::RemoveCommand> ExternalOperationHandlerTest::makeRemoveCommand(const vespalib::string& id) const {
+ return std::make_shared<api::RemoveCommand>(document::BucketId(0), DocumentId(id), api::Timestamp(0));
+}
+
void
ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution()
{
@@ -224,18 +289,13 @@ void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time(
}
void ExternalOperationHandlerTest::reject_put_if_not_past_safe_time_point() {
- auto doc = _testDocMan.createDocument("foo", "id:foo:testdoctype1::bar");
- auto cmd = std::make_shared<api::PutCommand>(
- document::BucketId(0), std::move(doc), api::Timestamp(0));
- assert_rejection_due_to_unsafe_time(cmd);
+ assert_rejection_due_to_unsafe_time(makePutCommand("foo", "id:foo:testdoctype1::bar"));
CPPUNIT_ASSERT_EQUAL(int64_t(1), safe_time_not_reached_metric_count(
getDistributor().getMetrics().puts));
}
void ExternalOperationHandlerTest::reject_remove_if_not_past_safe_time_point() {
- document::DocumentId id("id:foo:testdoctype1::bar");
- assert_rejection_due_to_unsafe_time(std::make_shared<api::RemoveCommand>(
- document::BucketId(0), id, api::Timestamp(0)));
+ assert_rejection_due_to_unsafe_time(makeRemoveCommand("id:foo:testdoctype1::bar"));
CPPUNIT_ASSERT_EQUAL(int64_t(1), safe_time_not_reached_metric_count(
getDistributor().getMetrics().removes));
}
@@ -268,7 +328,7 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
Operation::SP generated;
- document::DocumentId id("id:foo:testdoctype1::bar");
+ DocumentId id("id:foo:testdoctype1::bar");
getExternalOperationHandler().handleMessage(
std::make_shared<api::RemoveCommand>(
document::BucketId(0), id, api::Timestamp(0)),
@@ -279,5 +339,134 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached
getDistributor().getMetrics().removes));
}
+void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() {
+ createLinks();
+ setupDistributor(1, 2, "distributor:1 storage:1");
+}
+
+Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected(
+ std::shared_ptr<api::StorageCommand> cmd) {
+ Operation::SP generated;
+ _sender.replies.clear();
+ getExternalOperationHandler().handleMessage(cmd, generated);
+ CPPUNIT_ASSERT(generated.get() != nullptr);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size());
+ return generated;
+}
+void ExternalOperationHandlerTest::start_operation_verify_rejected(
+ std::shared_ptr<api::StorageCommand> cmd) {
+ Operation::SP generated;
+ _sender.replies.clear();
+ getExternalOperationHandler().handleMessage(cmd, generated);
+ CPPUNIT_ASSERT(generated.get() == nullptr);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+}
+
+void ExternalOperationHandlerTest::assert_second_command_rejected_due_to_concurrent_mutation(
+ std::shared_ptr<api::StorageCommand> cmd1,
+ std::shared_ptr<api::StorageCommand> cmd2,
+ const vespalib::string& expected_id_in_message) {
+ set_up_distributor_for_sequencing_test();
+
+ // Must hold ref to started operation, or sequencing handle will be released.
+ Operation::SP generated1 = start_operation_verify_not_rejected(std::move(cmd1));
+ start_operation_verify_rejected(std::move(cmd2));
+
+ // TODO reconsider BUSY return code. Need something transient and non-noisy
+ CPPUNIT_ASSERT_EQUAL(
+ std::string(vespalib::make_string(
+ "ReturnCode(BUSY, A mutating operation for document "
+ "'%s' is already in progress)", expected_id_in_message.c_str())),
+ _sender.replies[0]->getResult().toString());
+}
+
+void ExternalOperationHandlerTest::assert_second_command_not_rejected_due_to_concurrent_mutation(
+ std::shared_ptr<api::StorageCommand> cmd1,
+ std::shared_ptr<api::StorageCommand> cmd2) {
+ set_up_distributor_for_sequencing_test();
+
+ Operation::SP generated1 = start_operation_verify_not_rejected(std::move(cmd1));
+ start_operation_verify_not_rejected(std::move(cmd2));
+}
+
+void ExternalOperationHandlerTest::reject_put_with_concurrent_mutation_to_same_id() {
+ assert_second_command_rejected_due_to_concurrent_mutation(
+ makePutCommand("testdoctype1", _dummy_id),
+ makePutCommand("testdoctype1", _dummy_id), _dummy_id);
+}
+
+void ExternalOperationHandlerTest::do_not_reject_put_operations_to_different_ids() {
+ assert_second_command_not_rejected_due_to_concurrent_mutation(
+ makePutCommand("testdoctype1", "id:foo:testdoctype1::baz"),
+ makePutCommand("testdoctype1", "id:foo:testdoctype1::foo"));
+}
+
+void ExternalOperationHandlerTest::reject_remove_with_concurrent_mutation_to_same_id() {
+ assert_second_command_rejected_due_to_concurrent_mutation(
+ makeRemoveCommand(_dummy_id), makeRemoveCommand(_dummy_id), _dummy_id);
+}
+
+void ExternalOperationHandlerTest::do_not_reject_remove_operations_to_different_ids() {
+ assert_second_command_not_rejected_due_to_concurrent_mutation(
+ makeRemoveCommand("id:foo:testdoctype1::baz"),
+ makeRemoveCommand("id:foo:testdoctype1::foo"));
+}
+
+void ExternalOperationHandlerTest::reject_update_with_concurrent_mutation_to_same_id() {
+ assert_second_command_rejected_due_to_concurrent_mutation(
+ makeUpdateCommand("testdoctype1", _dummy_id),
+ makeUpdateCommand("testdoctype1", _dummy_id), _dummy_id);
+}
+
+void ExternalOperationHandlerTest::do_not_reject_update_operations_to_different_ids() {
+ assert_second_command_not_rejected_due_to_concurrent_mutation(
+ makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz"),
+ makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::foo"));
+}
+
+void ExternalOperationHandlerTest::operation_destruction_allows_new_mutations_for_id() {
+ set_up_distributor_for_sequencing_test();
+
+ Operation::SP generated = start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
+
+ generated.reset(); // Implicitly release sequencing handle
+
+ start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
+}
+
+void ExternalOperationHandlerTest::concurrent_get_and_mutation_do_not_conflict() {
+ set_up_distributor_for_sequencing_test();
+
+ Operation::SP generated1 = start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
+
+ start_operation_verify_not_rejected(makeGetCommand(_dummy_id));
+}
+
+void ExternalOperationHandlerTest::sequencing_works_across_mutation_types() {
+ set_up_distributor_for_sequencing_test();
+
+ Operation::SP generated = start_operation_verify_not_rejected(makePutCommand("testdoctype1", _dummy_id));
+ start_operation_verify_rejected(makeRemoveCommand(_dummy_id));
+ start_operation_verify_rejected(makeUpdateCommand("testdoctype1", _dummy_id));
+}
+
+void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled() {
+ set_up_distributor_for_sequencing_test();
+
+ // Should be able to modify config after links have been created, i.e. this is a live config.
+ getConfig().setSequenceMutatingOperations(false);
+
+ Operation::SP generated = start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
+ // Sequencing is disabled, so concurrent op is not rejected.
+ start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id));
+}
+
+// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
+// the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket
+// sub tree under a given location, while the sequencer works on individual GIDs. Mapping the
+// former to the latter is not trivial unless we introduce higher level "location" mutation
+// pseudo-locks in the sequencer. I.e. if we get a RemoveLocation with id.user==123456, this
+// prevents any handles from being acquired to any GID under location BucketId(32, 123456).
+
} // distributor
} // storage
diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp
new file mode 100644
index 00000000000..c9f9fb4b0ed
--- /dev/null
+++ b/storage/src/tests/distributor/operation_sequencer_test.cpp
@@ -0,0 +1,62 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/storage/distributor/operation_sequencer.h>
+#include <vespa/document/base/documentid.h>
+
+namespace storage::distributor {
+
+using document::DocumentId;
+
+class OperationSequencerTest : public CppUnit::TestFixture {
+ CPPUNIT_TEST_SUITE(OperationSequencerTest);
+ CPPUNIT_TEST(can_get_sequencing_handle_for_id_without_existing_handle);
+ CPPUNIT_TEST(can_get_sequencing_handle_for_different_ids);
+ CPPUNIT_TEST(cannot_get_sequencing_handle_for_id_with_existing_handle);
+ CPPUNIT_TEST(releasing_handle_allows_for_getting_new_handles_for_id);
+ CPPUNIT_TEST_SUITE_END();
+
+ void can_get_sequencing_handle_for_id_without_existing_handle();
+ void can_get_sequencing_handle_for_different_ids();
+ void cannot_get_sequencing_handle_for_id_with_existing_handle();
+ void releasing_handle_allows_for_getting_new_handles_for_id();
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(OperationSequencerTest);
+
+void OperationSequencerTest::can_get_sequencing_handle_for_id_without_existing_handle() {
+ OperationSequencer sequencer;
+ auto handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ CPPUNIT_ASSERT(handle.valid());
+}
+
+void OperationSequencerTest::cannot_get_sequencing_handle_for_id_with_existing_handle() {
+ OperationSequencer sequencer;
+ auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ CPPUNIT_ASSERT(! second_handle.valid());
+}
+
+void OperationSequencerTest::can_get_sequencing_handle_for_different_ids() {
+ OperationSequencer sequencer;
+ auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::efgh"));
+ CPPUNIT_ASSERT(first_handle.valid());
+ CPPUNIT_ASSERT(second_handle.valid());
+}
+
+void OperationSequencerTest::releasing_handle_allows_for_getting_new_handles_for_id() {
+ OperationSequencer sequencer;
+ auto first_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ // Explicit release
+ first_handle.release();
+ {
+ auto second_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ CPPUNIT_ASSERT(second_handle.valid());
+ // Implicit release by scope exit
+ }
+ auto third_handle = sequencer.try_acquire(DocumentId("id:foo:test::abcd"));
+ CPPUNIT_ASSERT(third_handle.valid());
+}
+
+} // storage::distributor
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index a5ce431d224..10d135ccf2d 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -32,6 +32,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_enableInconsistentJoin(false),
_enableHostInfoReporting(true),
_disableBucketActivation(false),
+ _sequenceMutatingOperations(true),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -141,6 +142,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_enableHostInfoReporting = config.enableHostInfoReporting;
_disableBucketActivation = config.disableBucketActivation;
+ _sequenceMutatingOperations = config.sequenceMutatingOperations;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index fd8ed0c272b..ed9e14808ee 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -230,6 +230,13 @@ public:
std::chrono::seconds getMaxClusterClockSkew() const noexcept {
return _maxClusterClockSkew;
}
+
+ bool getSequenceMutatingOperations() const noexcept {
+ return _sequenceMutatingOperations;
+ }
+ void setSequenceMutatingOperations(bool sequenceMutations) noexcept {
+ _sequenceMutatingOperations = sequenceMutations;
+ }
private:
DistributorConfiguration(const DistributorConfiguration& other);
@@ -268,6 +275,7 @@ private:
bool _enableInconsistentJoin;
bool _enableHostInfoReporting;
bool _disableBucketActivation;
+ bool _sequenceMutatingOperations;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 142ebc733f2..e73ddedae2c 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -174,3 +174,9 @@ disable_bucket_activation bool default=false
## transfers.
## Zero means this mechanism is disabled.
max_cluster_clock_skew_sec int default=1
+
+## If set, a distributor will only allow one active operation per document ID
+## for puts, updates and removes. This helps prevent issues caused by concurrent
+## modifications to documents when sent from multiple feed clients.
+sequence_mutating_operations bool default=true
+
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index f0f7881ebd2..3d970b27dbf 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -35,6 +35,7 @@ vespa_add_library(storage_distributor
managed_bucket_space.cpp
managed_bucket_space_component.cpp
managed_bucket_space_repo.cpp
+ operation_sequencer.cpp
$<TARGET_OBJECTS:storage_distributoroperation>
$<TARGET_OBJECTS:storage_distributoroperationexternal>
$<TARGET_OBJECTS:storage_distributoroperationidealstate>
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index c5354c54f25..be9d02c11f3 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -1,4 +1,4 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "externaloperationhandler.h"
#include "distributor.h"
@@ -19,7 +19,6 @@
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/batch.h>
#include <vespa/storageapi/message/stat.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.manager");
@@ -101,6 +100,27 @@ ExternalOperationHandler::checkTimestampMutationPreconditions(
return true;
}
+std::shared_ptr<api::StorageMessage>
+ExternalOperationHandler::makeConcurrentMutationRejectionReply(
+ api::StorageCommand& cmd,
+ const document::DocumentId& docId) const {
+ api::StorageReply::UP reply(cmd.makeReply());
+ reply->setResult(api::ReturnCode(
+ api::ReturnCode::BUSY, vespalib::make_string(
+ "A mutating operation for document '%s' is already in progress",
+ docId.toString().c_str())));
+ return std::shared_ptr<api::StorageMessage>(reply.release());
+}
+
+bool ExternalOperationHandler::allowMutation(const SequencingHandle& handle) const {
+ const auto& config(getDistributor().getConfig());
+ if (!config.getSequenceMutatingOperations()) {
+ // Sequencing explicitly disabled, so always allow.
+ return true;
+ }
+ return handle.valid();
+}
+
IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
{
if (!checkTimestampMutationPreconditions(
@@ -114,9 +134,12 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
cmd->setTimestamp(getUniqueTimestamp());
}
- _op = Operation::SP(new PutOperation(*this,
- cmd,
- getMetrics().puts[cmd->getLoadType()]));
+ auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
+ if (allowMutation(handle)) {
+ _op = std::make_shared<PutOperation>(*this, cmd, getMetrics().puts[cmd->getLoadType()], std::move(handle));
+ } else {
+ sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId()));
+ }
return true;
}
@@ -134,7 +157,13 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update)
if (cmd->getTimestamp() == 0) {
cmd->setTimestamp(getUniqueTimestamp());
}
- _op = Operation::SP(new TwoPhaseUpdateOperation(*this, cmd, getMetrics()));
+ auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
+ if (allowMutation(handle)) {
+ _op = std::make_shared<TwoPhaseUpdateOperation>(*this, cmd, getMetrics(), std::move(handle));
+ } else {
+ sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId()));
+ }
+
return true;
}
@@ -151,10 +180,17 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove)
if (cmd->getTimestamp() == 0) {
cmd->setTimestamp(getUniqueTimestamp());
}
- _op = Operation::SP(new RemoveOperation(
- *this,
- cmd,
- getMetrics().removes[cmd->getLoadType()]));
+ auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
+ if (allowMutation(handle)) {
+ _op = std::make_shared<RemoveOperation>(
+ *this,
+ cmd,
+ getMetrics().removes[cmd->getLoadType()],
+ std::move(handle));
+ } else {
+ sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId()));
+ }
+
return true;
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index d8f35e7e9e5..75ade3e2128 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -1,6 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "operation_sequencer.h"
#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/vdslib/state/clusterstate.h>
@@ -53,6 +54,7 @@ public:
private:
const MaintenanceOperationGenerator& _operationGenerator;
+ OperationSequencer _mutationSequencer;
Operation::SP _op;
TimePoint _rejectFeedBeforeTimeReached;
@@ -62,6 +64,10 @@ private:
api::StorageCommand& cmd,
const document::BucketId& bucket,
PersistenceOperationMetricSet& persistenceMetrics);
+ std::shared_ptr<api::StorageMessage> makeConcurrentMutationRejectionReply(
+ api::StorageCommand& cmd,
+ const document::DocumentId& docId) const;
+ bool allowMutation(const SequencingHandle& handle) const;
DistributorMetricSet& getMetrics() { return getDistributor().getMetrics(); }
};
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.cpp b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
new file mode 100644
index 00000000000..405df6a4ce9
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.cpp
@@ -0,0 +1,40 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "operation_sequencer.h"
+#include <vespa/document/base/documentid.h>
+#include <cassert>
+
+namespace storage {
+namespace distributor {
+
+void SequencingHandle::release() {
+ if (valid()) {
+ _sequencer->release(*this);
+ _sequencer = nullptr;
+ }
+}
+
+OperationSequencer::OperationSequencer() {
+}
+
+OperationSequencer::~OperationSequencer() {
+}
+
+SequencingHandle OperationSequencer::try_acquire(const document::DocumentId& id) {
+ const document::GlobalId gid(id.getGlobalId());
+ const auto inserted = _active_gids.insert(gid);
+ if (inserted.second) {
+ return SequencingHandle(*this, gid);
+ } else {
+ return SequencingHandle();
+ }
+}
+
+void OperationSequencer::release(const SequencingHandle& handle) {
+ assert(handle.valid());
+ _active_gids.erase(handle.gid());
+}
+
+} // distributor
+} // storage
+
diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.h b/storage/src/vespa/storage/distributor/operation_sequencer.h
new file mode 100644
index 00000000000..2a00a237bd5
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operation_sequencer.h
@@ -0,0 +1,85 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/document/base/globalid.h>
+#include <vespa/vespalib/stllike/hash_set.h>
+#include <utility>
+
+namespace document {
+class DocumentId;
+}
+
+namespace storage::distributor {
+
+class OperationSequencer;
+
+/**
+ * Represents a move-only handle which effectively holds a guard for
+ * allowing sequenced operations towards a particular document ID.
+ *
+ * Destroying a handle will implicitly release the guard, allowing
+ * new sequenced operations towards the ID.
+ */
+class SequencingHandle {
+ OperationSequencer* _sequencer;
+ document::GlobalId _gid;
+public:
+ SequencingHandle() : _sequencer(nullptr) {}
+ SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid)
+ : _sequencer(&sequencer),
+ _gid(gid)
+ {
+ }
+
+ ~SequencingHandle() {
+ release();
+ }
+
+ SequencingHandle(const SequencingHandle&) = delete;
+ SequencingHandle& operator=(const SequencingHandle&) = delete;
+
+ SequencingHandle(SequencingHandle&& rhs) noexcept
+ : _sequencer(rhs._sequencer),
+ _gid(rhs._gid)
+ {
+ rhs._sequencer = nullptr;
+ }
+
+ SequencingHandle& operator=(SequencingHandle&& rhs) noexcept {
+ if (&rhs != this) {
+ std::swap(_sequencer, rhs._sequencer);
+ std::swap(_gid, rhs._gid);
+ }
+ return *this;
+ }
+
+ bool valid() const noexcept { return (_sequencer != nullptr); }
+ const document::GlobalId& gid() const noexcept { return _gid; }
+ void release();
+};
+
+/**
+ * An operation sequencer allows for efficiently checking if an operation is
+ * already pending for a given document ID (with very high probability; false
+ * positives are possible, but false negatives are not).
+ *
+ * When a SequencingHandle is acquired for a given ID, no further valid handles
+ * can be acquired for that ID until the original handle has been destroyed.
+ */
+class OperationSequencer {
+ using GidSet = vespalib::hash_set<document::GlobalId, document::GlobalId::hash>;
+ GidSet _active_gids;
+
+ friend class SequencingHandle;
+public:
+ OperationSequencer();
+ ~OperationSequencer();
+
+ // Returns a handle with valid() == true iff no concurrent operations are
+ // already active for `id`.
+ SequencingHandle try_acquire(const document::DocumentId& id);
+private:
+ void release(const SequencingHandle& handle);
+};
+
+} // storage::distributor \ No newline at end of file
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 89fe8146d78..88657bb1be6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -21,8 +21,9 @@ using namespace storage;
PutOperation::PutOperation(DistributorComponent& manager,
const std::shared_ptr<api::PutCommand> & msg,
- PersistenceOperationMetricSet& metric)
- : Operation(),
+ PersistenceOperationMetricSet& metric,
+ SequencingHandle sequencingHandle)
+ : SequencedOperation(std::move(sequencingHandle)),
_trackerInstance(metric,
std::shared_ptr<api::BucketInfoReply>(new api::PutReply(*msg)),
manager,
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 6ba9e9c3dbf..6642669b317 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -2,7 +2,7 @@
#pragma once
-#include <vespa/storage/distributor/operations/operation.h>
+#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/storage/distributor/persistencemessagetracker.h>
#include <vespa/storage/distributor/operationtargetresolver.h>
@@ -20,12 +20,13 @@ namespace api {
}
namespace distributor {
-class PutOperation : public Operation
+class PutOperation : public SequencedOperation
{
public:
PutOperation(DistributorComponent& manager,
const std::shared_ptr<api::PutCommand> & msg,
- PersistenceOperationMetricSet& metric);
+ PersistenceOperationMetricSet& metric,
+ SequencingHandle sequencingHandle = SequencingHandle());
void onStart(DistributorMessageSender& sender) override;
const char* getName() const override { return "put"; };
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 596debf54a6..a8641e52acf 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -13,9 +13,10 @@ using namespace storage::distributor;
using namespace storage;
RemoveOperation::RemoveOperation(DistributorComponent& manager,
- const std::shared_ptr<api::RemoveCommand> & msg,
- PersistenceOperationMetricSet& metric)
- : Operation(),
+ const std::shared_ptr<api::RemoveCommand> & msg,
+ PersistenceOperationMetricSet& metric,
+ SequencingHandle sequencingHandle)
+ : SequencedOperation(std::move(sequencingHandle)),
_trackerInstance(metric,
std::shared_ptr<api::BucketInfoReply>(new api::RemoveReply(*msg)),
manager, msg->getTimestamp()),
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index b85170a0920..1e5b1ca4c8f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -1,7 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/storage/distributor/operations/operation.h>
+#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/storage/distributor/persistencemessagetracker.h>
namespace storage {
@@ -12,12 +12,13 @@ class RemoveCommand;
namespace distributor {
-class RemoveOperation : public Operation
+class RemoveOperation : public SequencedOperation
{
public:
RemoveOperation(DistributorComponent& manager,
const std::shared_ptr<api::RemoveCommand> & msg,
- PersistenceOperationMetricSet& metric);
+ PersistenceOperationMetricSet& metric,
+ SequencingHandle sequencingHandle = SequencingHandle());
void onStart(DistributorMessageSender& sender);
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 7b1ff03b8af..5b907d21fcd 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -19,10 +19,11 @@ namespace storage {
namespace distributor {
TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
- DistributorComponent& manager,
- const std::shared_ptr<api::UpdateCommand>& msg,
- DistributorMetricSet& metrics)
- : Operation(),
+ DistributorComponent& manager,
+ const std::shared_ptr<api::UpdateCommand>& msg,
+ DistributorMetricSet& metrics,
+ SequencingHandle sequencingHandle)
+ : SequencedOperation(std::move(sequencingHandle)),
_updateMetric(metrics.updates[msg->getLoadType()]),
_putMetric(metrics.update_puts[msg->getLoadType()]),
_getMetric(metrics.update_gets[msg->getLoadType()]),
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 7cff0aba493..c50b38f8af2 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -4,6 +4,7 @@
#include <set>
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/storage/distributor/persistencemessagetracker.h>
+#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/document/update/documentupdate.h>
namespace document {
@@ -44,12 +45,13 @@ namespace distributor {
*/
-class TwoPhaseUpdateOperation : public Operation
+class TwoPhaseUpdateOperation : public SequencedOperation
{
public:
TwoPhaseUpdateOperation(DistributorComponent& manager,
const std::shared_ptr<api::UpdateCommand> & msg,
- DistributorMetricSet& metrics);
+ DistributorMetricSet& metrics,
+ SequencingHandle sequencingHandle = SequencingHandle());
~TwoPhaseUpdateOperation();
void onStart(DistributorMessageSender& sender) override;
diff --git a/storage/src/vespa/storage/distributor/operations/sequenced_operation.h b/storage/src/vespa/storage/distributor/operations/sequenced_operation.h
new file mode 100644
index 00000000000..c4b37eba9e0
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/sequenced_operation.h
@@ -0,0 +1,25 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "operation.h"
+#include <vespa/storage/distributor/operation_sequencer.h>
+
+namespace storage::distributor {
+
+/**
+ * A sequenced operation is an operation whose concurrency against a specific document ID
+ * may be limited by the distributor to avoid race conditions caused by concurrent
+ * modifications.
+ */
+class SequencedOperation : public Operation {
+ SequencingHandle _sequencingHandle;
+public:
+ SequencedOperation() : Operation(), _sequencingHandle() {}
+
+ explicit SequencedOperation(SequencingHandle sequencingHandle)
+ : Operation(),
+ _sequencingHandle(std::move(sequencingHandle)) {
+ }
+};
+
+} // storage::distributor \ No newline at end of file