summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-01-17 09:53:05 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-01-17 10:02:20 +0000
commit72cec61c615bfe25628c375eade4c8d8832cd2da (patch)
treeb0cf200520cc40a9e42ffb874b712d02c910ef32 /storage
parentb9b0cf87f7335fed7af9854a2f1a63617c29451a (diff)
Add configurable support for weakly consistent client Gets
If configured, Get operations initiated by the client are flagged with weak internal consistency. This allows the backend to bypass certain internal synchronization mechanisms, which minimizes latency at the cost of possibly not observing a consistent view of the document. This config should only be used in a very restricted set of cases where the document set is effectively read-only, or cross- field consistency or freshness does not matter. To enable the weak consistency, use an explicit config override: ``` <config name="vespa.config.content.core.stor-distributormanager"> <use_weak_internal_read_consistency_for_client_gets> true </use_weak_internal_read_consistency_for_client_gets> </config> ``` This closes #11811
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp19
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp27
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp26
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp16
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def9
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h8
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp14
13 files changed, 153 insertions, 6 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 83ef7891630..afc3b254c64 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -187,6 +187,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_use_weak_internal_read_consistency(bool use_weak) {
+ ConfigBuilder builder;
+ builder.useWeakInternalReadConsistencyForClientGets = use_weak;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -1038,6 +1044,19 @@ TEST_F(DistributorTest, merge_disabling_config_is_propagated_to_internal_config)
EXPECT_FALSE(getConfig().merge_operations_disabled());
}
+TEST_F(DistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_use_weak_internal_read_consistency(true);
+ EXPECT_TRUE(getConfig().use_weak_internal_read_consistency_for_client_gets());
+ EXPECT_TRUE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets());
+
+ configure_use_weak_internal_read_consistency(false);
+ EXPECT_FALSE(getConfig().use_weak_internal_read_consistency_for_client_gets());
+ EXPECT_FALSE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets());
+}
+
TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) {
createLinks(false);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index f2f949a6d56..03ba148277e 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -83,6 +83,7 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil {
close();
}
+ void do_test_get_weak_consistency_is_propagated(bool use_weak);
};
TEST_F(ExternalOperationHandlerTest, bucket_split_mask) {
@@ -534,6 +535,32 @@ TEST_F(ExternalOperationHandlerTest, gets_are_busy_bounced_during_transition_per
_sender.reply(0)->getResult().toString());
}
+void ExternalOperationHandlerTest::do_test_get_weak_consistency_is_propagated(bool use_weak) {
+ createLinks();
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
+ // Explicitly only touch config in the case weak consistency is enabled to ensure the
+ // default is strong.
+ if (use_weak) {
+ getExternalOperationHandler().set_use_weak_internal_read_consistency_for_gets(true);
+ }
+ document::BucketId b(16, 1234);
+ Operation::SP op;
+ ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(
+ makeGetCommandForUser(b.withoutCountBits()), op));
+ auto& get_op = dynamic_cast<GetOperation&>(*op);
+ EXPECT_EQ(get_op.desired_read_consistency(),
+ (use_weak ? api::InternalReadConsistency::Weak
+ : api::InternalReadConsistency::Strong));
+}
+
+TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_strong_consistency_by_default) {
+ do_test_get_weak_consistency_is_propagated(false);
+}
+
+TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_config_enabled) {
+ do_test_get_weak_consistency_is_propagated(true);
+}
+
// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
// the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket
// sub tree under a given location, while the sequencer works on individual GIDs. Mapping the
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index db790639869..0dbec8444cc 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -51,12 +51,13 @@ struct GetOperationTest : Test, DistributorTestUtil {
op.reset();
}
- void sendGet() {
+ void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) {
auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]");
op = std::make_unique<GetOperation>(
getExternalOperationHandler(), getDistributorBucketSpace(),
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
- msg, getDistributor().getMetrics(). gets[msg->getLoadType()]);
+ msg, getDistributor().getMetrics(). gets[msg->getLoadType()],
+ consistency);
op->start(_sender, framework::MilliSecTime(0));
}
@@ -127,6 +128,8 @@ struct GetOperationTest : Test, DistributorTestUtil {
void setClusterState(const std::string& clusterState) {
enableDistributorClusterState(clusterState);
}
+
+ void do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency);
};
GetOperationTest::GetOperationTest() = default;
@@ -497,4 +500,23 @@ TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
EXPECT_EQ("Get => 0", _sender.getCommands(true));
}
+void GetOperationTest::do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency) {
+ setClusterState("distributor:1 storage:1");
+ addNodesToBucketDB(bucketId, "0=4");
+ sendGet(consistency);
+ ASSERT_TRUE(op);
+ EXPECT_EQ(dynamic_cast<GetOperation&>(*op).desired_read_consistency(), consistency);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true));
+ auto& cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
+ EXPECT_EQ(cmd.internal_read_consistency(), consistency);
+}
+
+TEST_F(GetOperationTest, can_send_gets_with_strong_internal_read_consistency) {
+ do_test_read_consistency_is_propagated(api::InternalReadConsistency::Strong);
+}
+
+TEST_F(GetOperationTest, can_send_gets_with_weak_internal_read_consistency) {
+ do_test_read_consistency_is_propagated(api::InternalReadConsistency::Weak);
+}
+
}
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index c8bfcd754f7..f1900c40d56 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -1088,6 +1088,22 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_foun
ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
}
+// The weak consistency config _only_ applies to Get operations initiated directly
+// by the client, not those indirectly initiated by the distributor in order to
+// fulfill update write-repairs.
+TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency_even_if_weak_consistency_configured) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_use_weak_internal_read_consistency_for_client_gets(true);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*sender.command(0));
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index ed14e227fc1..522561ee8a5 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -41,6 +41,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_allowStaleReadsDuringClusterStateTransitions(false),
_update_fast_path_restart_enabled(false),
_merge_operations_disabled(false),
+ _use_weak_internal_read_consistency_for_client_gets(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -153,6 +154,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions;
_update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent;
_merge_operations_disabled = config.mergeOperationsDisabled;
+ _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 5b01e37992b..333d7073715 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -228,6 +228,13 @@ public:
_merge_operations_disabled = disabled;
}
+ void set_use_weak_internal_read_consistency_for_client_gets(bool use_weak) noexcept {
+ _use_weak_internal_read_consistency_for_client_gets = use_weak;
+ }
+ bool use_weak_internal_read_consistency_for_client_gets() const noexcept {
+ return _use_weak_internal_read_consistency_for_client_gets;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -273,6 +280,7 @@ private:
bool _allowStaleReadsDuringClusterStateTransitions;
bool _update_fast_path_restart_enabled;
bool _merge_operations_disabled;
+ bool _use_weak_internal_read_consistency_for_client_gets;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 4587e7b3ebe..e4d3182ce7a 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -219,3 +219,12 @@ restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=
## This is ONLY intended for system testing of certain transient edge cases and
## MUST NOT be set to true in a production environment.
merge_operations_disabled bool default=false
+
+## If set, Get operations that are initiated by the client (i.e. _not_ Get operations
+## that are initiated by the distributor) will be forwarded to the backend with
+## a flag signalling that weak read consistency may be used. This allows the
+## backend to minimize internal locking. The downside is that it's not guaranteed
+## to observe the most recent writes to the document, nor to observe an atomically
+## consistent view of fields across document versions.
+## This is mostly useful in a system that is effectively read-only.
+use_weak_internal_read_consistency_for_client_gets bool default=false
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 5c6773ea5bf..988af43a7be 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -849,6 +849,8 @@ Distributor::enableNextConfig()
// Concurrent reads are only safe if the B-tree DB implementation is used.
_externalOperationHandler.set_concurrent_gets_enabled(
_use_btree_database && getConfig().allowStaleReadsDuringClusterStateTransitions());
+ _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets(
+ getConfig().use_weak_internal_read_consistency_for_client_gets());
}
void
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index adeab7ba132..b946b788391 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -62,7 +62,8 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
_rejectFeedBeforeTimeReached(), // At epoch
_non_main_thread_ops_mutex(),
_non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()),
- _concurrent_gets_enabled(false)
+ _concurrent_gets_enabled(false),
+ _use_weak_internal_read_consistency_for_gets(false)
{
}
@@ -323,6 +324,12 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
return true;
}
+api::InternalReadConsistency ExternalOperationHandler::desired_get_read_consistency() const noexcept {
+ return (use_weak_internal_read_consistency_for_gets()
+ ? api::InternalReadConsistency::Weak
+ : api::InternalReadConsistency::Strong);
+}
+
std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr<api::GetCommand>& cmd) {
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
auto& metrics = getMetrics().gets[cmd->getLoadType()];
@@ -342,7 +349,8 @@ std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(
const auto* space_repo = snapshot.bucket_space_repo();
assert(space_repo != nullptr);
return std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()),
- snapshot.steal_read_guard(), cmd, metrics);
+ snapshot.steal_read_guard(), cmd, metrics,
+ desired_get_read_consistency());
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index e38f6792717..96875a3644a 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -66,6 +66,14 @@ public:
return _concurrent_gets_enabled.load(std::memory_order_relaxed);
}
+ void set_use_weak_internal_read_consistency_for_gets(bool use_weak) noexcept {
+ _use_weak_internal_read_consistency_for_gets.store(use_weak, std::memory_order_relaxed);
+ }
+
+ bool use_weak_internal_read_consistency_for_gets() const noexcept {
+ return _use_weak_internal_read_consistency_for_gets.load(std::memory_order_relaxed);
+ }
+
private:
std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender;
const MaintenanceOperationGenerator& _operationGenerator;
@@ -75,6 +83,7 @@ private:
mutable std::mutex _non_main_thread_ops_mutex;
OperationOwner _non_main_thread_ops_owner;
std::atomic<bool> _concurrent_gets_enabled;
+ std::atomic<bool> _use_weak_internal_read_consistency_for_gets;
template <typename Func>
void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
@@ -103,6 +112,8 @@ private:
PersistenceOperationMetricSet& persistenceMetrics) const;
bool allowMutation(const SequencingHandle& handle) const;
+ api::InternalReadConsistency desired_get_read_consistency() const noexcept;
+
DistributorMetricSet& getMetrics() { return getDistributor().getMetrics(); }
};
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 52b6da09a76..9e66c212ac6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -48,7 +48,8 @@ GetOperation::GetOperation(DistributorComponent& manager,
const DistributorBucketSpace &bucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
- PersistenceOperationMetricSet& metric)
+ PersistenceOperationMetricSet& metric,
+ api::InternalReadConsistency desired_read_consistency)
: Operation(),
_manager(manager),
_bucketSpace(bucketSpace),
@@ -58,6 +59,7 @@ GetOperation::GetOperation(DistributorComponent& manager,
_lastModified(),
_metric(metric),
_operationTimer(manager.getClock()),
+ _desired_read_consistency(desired_read_consistency),
_has_replica_inconsistency(false)
{
assignTargetNodeGroups(*read_guard);
@@ -104,6 +106,7 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::
auto command = std::make_shared<api::GetCommand>(bucket, _msg->getDocumentId(),
_msg->getFieldSet(), _msg->getBeforeTimestamp());
copyMessageSettings(*_msg, *command);
+ command->set_internal_read_consistency(_desired_read_consistency);
LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode());
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 69f7b5580f4..1106968bcf7 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -28,7 +28,8 @@ public:
const DistributorBucketSpace &bucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
- PersistenceOperationMetricSet& metric);
+ PersistenceOperationMetricSet& metric,
+ api::InternalReadConsistency desired_read_consistency = api::InternalReadConsistency::Strong);
void onClose(DistributorMessageSender& sender) override;
void onStart(DistributorMessageSender& sender) override;
@@ -45,6 +46,10 @@ public:
return _replicas_in_db;
}
+ api::InternalReadConsistency desired_read_consistency() const noexcept {
+ return _desired_read_consistency;
+ }
+
private:
class GroupId {
public:
@@ -94,6 +99,7 @@ private:
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
+ api::InternalReadConsistency _desired_read_consistency;
bool _has_replica_inconsistency;
void sendReply(DistributorMessageSender& sender);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index b738a9ecf00..7d0ce26b83d 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -164,6 +164,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
return tracker;
}
+namespace {
+
+spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept {
+ switch (consistency) {
+ case api::InternalReadConsistency::Strong: return spi::ReadConsistency::STRONG;
+ case api::InternalReadConsistency::Weak: return spi::ReadConsistency::WEAK;
+ default: abort();
+ }
+}
+
+}
+
MessageTracker::UP
PersistenceThread::handleGet(api::GetCommand& cmd)
{
@@ -173,6 +185,8 @@ PersistenceThread::handleGet(api::GetCommand& cmd)
document::FieldSetRepo repo;
document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
+ // _context is reset per command, so it's safe to modify it like this.
+ _context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
spi::GetResult result =
_spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context);