aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-01-17 12:58:28 +0100
committerGitHub <noreply@github.com>2020-01-17 12:58:28 +0100
commitca80f4634d471c078047337e1d10a7f73c163900 (patch)
tree9228ba22c84ef0814453f555b3dfff925204a195 /storage
parent2635cb9c2a8b1cd0a548185e3c7a07ee64f842b1 (diff)
parent72cec61c615bfe25628c375eade4c8d8832cd2da (diff)
Merge pull request #11830 from vespa-engine/vekterli/support-weak-internal-read-consistency-for-client-gets
Vekterli/support weak internal read consistency for client gets
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp19
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp27
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp26
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp16
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def9
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h8
-rw-r--r--storage/src/vespa/storage/persistence/messages.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp14
14 files changed, 154 insertions, 7 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 83ef7891630..afc3b254c64 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -187,6 +187,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_use_weak_internal_read_consistency(bool use_weak) {
+ ConfigBuilder builder;
+ builder.useWeakInternalReadConsistencyForClientGets = use_weak;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -1038,6 +1044,19 @@ TEST_F(DistributorTest, merge_disabling_config_is_propagated_to_internal_config)
EXPECT_FALSE(getConfig().merge_operations_disabled());
}
+TEST_F(DistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_use_weak_internal_read_consistency(true);
+ EXPECT_TRUE(getConfig().use_weak_internal_read_consistency_for_client_gets());
+ EXPECT_TRUE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets());
+
+ configure_use_weak_internal_read_consistency(false);
+ EXPECT_FALSE(getConfig().use_weak_internal_read_consistency_for_client_gets());
+ EXPECT_FALSE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets());
+}
+
TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) {
createLinks(false);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index f2f949a6d56..03ba148277e 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -83,6 +83,7 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil {
close();
}
+ void do_test_get_weak_consistency_is_propagated(bool use_weak);
};
TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
@@ -534,6 +535,32 @@ TEST_F(ExternalOperationHandlerTest, gets_are_busy_bounced_during_transition_per
_sender.reply(0)->getResult().toString());
}
+void ExternalOperationHandlerTest::do_test_get_weak_consistency_is_propagated(bool use_weak) {
+ createLinks();
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
+ // Explicitly only touch config in the case weak consistency is enabled to ensure the
+ // default is strong.
+ if (use_weak) {
+ getExternalOperationHandler().set_use_weak_internal_read_consistency_for_gets(true);
+ }
+ document::BucketId b(16, 1234);
+ Operation::SP op;
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(
+ makeGetCommandForUser(b.withoutCountBits()), op));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ EXPECT_EQ(get_op.desired_read_consistency(),
+ (use_weak ? api::InternalReadConsistency::Weak
+ : api::InternalReadConsistency::Strong));
+}
+
+TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_strong_consistency_by_default) {
+ do_test_get_weak_consistency_is_propagated(false);
+}
+
+TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_config_enabled) {
+ do_test_get_weak_consistency_is_propagated(true);
+}
+
// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
// the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket
// sub tree under a given location, while the sequencer works on individual GIDs. Mapping the
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index db790639869..0dbec8444cc 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -51,12 +51,13 @@ struct GetOperationTest : Test, DistributorTestUtil {
op.reset();
}
- void sendGet() {
+ void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) {
auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]");
op = std::make_unique<GetOperation>(
getExternalOperationHandler(), getDistributorBucketSpace(),
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
- msg, getDistributor().getMetrics(). gets[msg->getLoadType()]);
+ msg, getDistributor().getMetrics(). gets[msg->getLoadType()],
+ consistency);
op->start(_sender, framework::MilliSecTime(0));
}
@@ -127,6 +128,8 @@ struct GetOperationTest : Test, DistributorTestUtil {
void setClusterState(const std::string& clusterState) {
enableDistributorClusterState(clusterState);
}
+
+ void do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency);
};
GetOperationTest::GetOperationTest() = default;
@@ -497,4 +500,23 @@ TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
EXPECT_EQ("Get => 0", _sender.getCommands(true));
}
+void GetOperationTest::do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency) {
+ setClusterState("distributor:1 storage:1");
+ addNodesToBucketDB(bucketId, "0=4");
+ sendGet(consistency);
+ ASSERT_TRUE(op);
+ EXPECT_EQ(dynamic_cast<GetOperation&>(*op).desired_read_consistency(), consistency);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true));
+ auto& cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
+ EXPECT_EQ(cmd.internal_read_consistency(), consistency);
+}
+
+TEST_F(GetOperationTest, can_send_gets_with_strong_internal_read_consistency) {
+ do_test_read_consistency_is_propagated(api::InternalReadConsistency::Strong);
+}
+
+TEST_F(GetOperationTest, can_send_gets_with_weak_internal_read_consistency) {
+ do_test_read_consistency_is_propagated(api::InternalReadConsistency::Weak);
+}
+
}
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index c8bfcd754f7..f1900c40d56 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -1088,6 +1088,22 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_foun
ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
}
+// The weak consistency config _only_ applies to Get operations initiated directly
+// by the client, not those indirectly initiated by the distributor in order to
+// fulfill update write-repairs.
+TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency_even_if_weak_consistency_configured) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_use_weak_internal_read_consistency_for_client_gets(true);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*sender.command(0));
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index ed14e227fc1..522561ee8a5 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -41,6 +41,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_allowStaleReadsDuringClusterStateTransitions(false),
_update_fast_path_restart_enabled(false),
_merge_operations_disabled(false),
+ _use_weak_internal_read_consistency_for_client_gets(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -153,6 +154,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions;
_update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent;
_merge_operations_disabled = config.mergeOperationsDisabled;
+ _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 5b01e37992b..333d7073715 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -228,6 +228,13 @@ public:
_merge_operations_disabled = disabled;
}
+ void set_use_weak_internal_read_consistency_for_client_gets(bool use_weak) noexcept {
+ _use_weak_internal_read_consistency_for_client_gets = use_weak;
+ }
+ bool use_weak_internal_read_consistency_for_client_gets() const noexcept {
+ return _use_weak_internal_read_consistency_for_client_gets;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -273,6 +280,7 @@ private:
bool _allowStaleReadsDuringClusterStateTransitions;
bool _update_fast_path_restart_enabled;
bool _merge_operations_disabled;
+ bool _use_weak_internal_read_consistency_for_client_gets;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 4587e7b3ebe..e4d3182ce7a 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -219,3 +219,12 @@ restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=
## This is ONLY intended for system testing of certain transient edge cases and
## MUST NOT be set to true in a production environment.
merge_operations_disabled bool default=false
+
+## If set, Get operations that are initiated by the client (i.e. _not_ Get operations
+## that are initiated by the distributor) will be forwarded to the backend with
+## a flag signalling that weak read consistency may be used. This allows the
+## backend to minimize internal locking. The downside is that it's not guaranteed
+## to observe the most recent writes to the document, nor to observe an atomically
+## consistent view of fields across document versions.
+## This is mostly useful in a system that is effectively read-only.
+use_weak_internal_read_consistency_for_client_gets bool default=false
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 5c6773ea5bf..988af43a7be 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -849,6 +849,8 @@ Distributor::enableNextConfig()
// Concurrent reads are only safe if the B-tree DB implementation is used.
_externalOperationHandler.set_concurrent_gets_enabled(
_use_btree_database && getConfig().allowStaleReadsDuringClusterStateTransitions());
+ _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets(
+ getConfig().use_weak_internal_read_consistency_for_client_gets());
}
void
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index adeab7ba132..b946b788391 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -62,7 +62,8 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
_rejectFeedBeforeTimeReached(), // At epoch
_non_main_thread_ops_mutex(),
_non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()),
- _concurrent_gets_enabled(false)
+ _concurrent_gets_enabled(false),
+ _use_weak_internal_read_consistency_for_gets(false)
{
}
@@ -323,6 +324,12 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
return true;
}
+api::InternalReadConsistency ExternalOperationHandler::desired_get_read_consistency() const noexcept {
+ return (use_weak_internal_read_consistency_for_gets()
+ ? api::InternalReadConsistency::Weak
+ : api::InternalReadConsistency::Strong);
+}
+
std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr<api::GetCommand>& cmd) {
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
auto& metrics = getMetrics().gets[cmd->getLoadType()];
@@ -342,7 +349,8 @@ std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(
const auto* space_repo = snapshot.bucket_space_repo();
assert(space_repo != nullptr);
return std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()),
- snapshot.steal_read_guard(), cmd, metrics);
+ snapshot.steal_read_guard(), cmd, metrics,
+ desired_get_read_consistency());
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index e38f6792717..96875a3644a 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -66,6 +66,14 @@ public:
return _concurrent_gets_enabled.load(std::memory_order_relaxed);
}
+ void set_use_weak_internal_read_consistency_for_gets(bool use_weak) noexcept {
+ _use_weak_internal_read_consistency_for_gets.store(use_weak, std::memory_order_relaxed);
+ }
+
+ bool use_weak_internal_read_consistency_for_gets() const noexcept {
+ return _use_weak_internal_read_consistency_for_gets.load(std::memory_order_relaxed);
+ }
+
private:
std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender;
const MaintenanceOperationGenerator& _operationGenerator;
@@ -75,6 +83,7 @@ private:
mutable std::mutex _non_main_thread_ops_mutex;
OperationOwner _non_main_thread_ops_owner;
std::atomic<bool> _concurrent_gets_enabled;
+ std::atomic<bool> _use_weak_internal_read_consistency_for_gets;
template <typename Func>
void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
@@ -103,6 +112,8 @@ private:
PersistenceOperationMetricSet& persistenceMetrics) const;
bool allowMutation(const SequencingHandle& handle) const;
+ api::InternalReadConsistency desired_get_read_consistency() const noexcept;
+
DistributorMetricSet& getMetrics() { return getDistributor().getMetrics(); }
};
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 52b6da09a76..9e66c212ac6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -48,7 +48,8 @@ GetOperation::GetOperation(DistributorComponent& manager,
const DistributorBucketSpace &bucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
- PersistenceOperationMetricSet& metric)
+ PersistenceOperationMetricSet& metric,
+ api::InternalReadConsistency desired_read_consistency)
: Operation(),
_manager(manager),
_bucketSpace(bucketSpace),
@@ -58,6 +59,7 @@ GetOperation::GetOperation(DistributorComponent& manager,
_lastModified(),
_metric(metric),
_operationTimer(manager.getClock()),
+ _desired_read_consistency(desired_read_consistency),
_has_replica_inconsistency(false)
{
assignTargetNodeGroups(*read_guard);
@@ -104,6 +106,7 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::
auto command = std::make_shared<api::GetCommand>(bucket, _msg->getDocumentId(),
_msg->getFieldSet(), _msg->getBeforeTimestamp());
copyMessageSettings(*_msg, *command);
+ command->set_internal_read_consistency(_desired_read_consistency);
LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode());
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 69f7b5580f4..1106968bcf7 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -28,7 +28,8 @@ public:
const DistributorBucketSpace &bucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
- PersistenceOperationMetricSet& metric);
+ PersistenceOperationMetricSet& metric,
+ api::InternalReadConsistency desired_read_consistency = api::InternalReadConsistency::Strong);
void onClose(DistributorMessageSender& sender) override;
void onStart(DistributorMessageSender& sender) override;
@@ -45,6 +46,10 @@ public:
return _replicas_in_db;
}
+ api::InternalReadConsistency desired_read_consistency() const noexcept {
+ return _desired_read_consistency;
+ }
+
private:
class GroupId {
public:
@@ -94,6 +99,7 @@ private:
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
+ api::InternalReadConsistency _desired_read_consistency;
bool _has_replica_inconsistency;
void sendReply(DistributorMessageSender& sender);
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index d0572e7dbf8..82dc69156ab 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -25,7 +25,7 @@ public:
GetIterCommand(const document::Bucket &bucket,
const spi::IteratorId iteratorId,
uint32_t maxByteSize);
- ~GetIterCommand();
+ ~GetIterCommand() override;
std::unique_ptr<api::StorageReply> makeReply() override;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index b738a9ecf00..7d0ce26b83d 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -164,6 +164,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
return tracker;
}
+namespace {
+
+spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept {
+ switch (consistency) {
+ case api::InternalReadConsistency::Strong: return spi::ReadConsistency::STRONG;
+ case api::InternalReadConsistency::Weak: return spi::ReadConsistency::WEAK;
+ default: abort();
+ }
+}
+
+}
+
MessageTracker::UP
PersistenceThread::handleGet(api::GetCommand& cmd)
{
@@ -173,6 +185,8 @@ PersistenceThread::handleGet(api::GetCommand& cmd)
document::FieldSetRepo repo;
document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
+ // _context is reset per command, so it's safe to modify it like this.
+ _context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
spi::GetResult result =
_spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context);