summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-04 12:58:21 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-08 12:20:52 +0000
commitecd57816948aeaa143f9686dea971fa01763e2e4 (patch)
tree8544b9ca2e047a7d1999f3ae7df593b27dc6727a /storage
parent5bedd856c54812faa780d52236c1cf6394ed3fd9 (diff)
Allow executing Get operations outside the main distributor thread
Requires _both_ B-tree DB to be used _and_ stale reads to be enabled.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp24
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp4
-rw-r--r--storage/src/tests/distributor/distributortestutil.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h1
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp38
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h14
-rw-r--r--storage/src/vespa/storage/distributor/operationstarter.h2
8 files changed, 83 insertions, 11 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 637c214033d..5edd8c23394 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -974,4 +974,28 @@ TEST_F(DistributorTest, pending_to_no_pending_global_merges_edge_immediately_sen
do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::global_space());
}
+TEST_F(DistributorTest, stale_reads_config_is_propagated_to_external_operation_handler) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ ConfigBuilder builder;
+ builder.allowStaleReadsDuringClusterStateTransitions = true;
+ configureDistributor(builder);
+ EXPECT_TRUE(getExternalOperationHandler().concurrent_gets_enabled());
+
+ builder.allowStaleReadsDuringClusterStateTransitions = false;
+ configureDistributor(builder);
+ EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled());
+}
+
+TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) {
+ createLinks(false);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ ConfigBuilder builder;
+ builder.allowStaleReadsDuringClusterStateTransitions = true;
+ configureDistributor(builder);
+ EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled());
+}
+
}
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 15820b64ff9..9f0c56c3fa5 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -20,7 +20,7 @@ DistributorTestUtil::DistributorTestUtil()
DistributorTestUtil::~DistributorTestUtil() { }
void
-DistributorTestUtil::createLinks()
+DistributorTestUtil::createLinks(bool use_btree_db)
{
_node.reset(new TestDistributorApp(_config.getConfigId()));
_threadPool = framework::TickingThreadPool::createDefault("distributor");
@@ -29,7 +29,7 @@ DistributorTestUtil::createLinks()
*_threadPool,
*this,
true,
- false, // TODO swap default
+ use_btree_db,
_hostInfo,
&_messageSender));
_component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil"));
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 3dc71bcb433..a6e7ea16798 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -34,7 +34,7 @@ public:
/**
* Sets up the storage link chain.
*/
- void createLinks();
+ void createLinks(bool use_btree_db = false);
void setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo);
void close();
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index ab6776717aa..d903db4f6ed 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -102,7 +102,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_ownershipSafeTimeCalc(
std::make_unique<OwnershipTransferSafeTimePointCalculator>(
std::chrono::seconds(0))), // Set by config later
- _must_send_updated_host_info(false)
+ _must_send_updated_host_info(false),
+ _use_btree_database(use_btree_database)
{
if (use_btree_database) {
LOG(info, "Using new B-tree bucket database implementation instead of legacy implementation"); // TODO remove this once default is swapped
@@ -244,6 +245,9 @@ Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg)
bool
Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
+ if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) {
+ return true;
+ }
framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
MBUS_TRACE(msg->getTrace(), 9,
"Distributor: Added to message queue. Thread state: "
@@ -837,6 +841,9 @@ Distributor::enableNextConfig()
_ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew());
_pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration());
_bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions());
+ // Concurrent reads are only safe if the B-tree DB implementation is used.
+ _externalOperationHandler.set_concurrent_gets_enabled(
+ _use_btree_database && getConfig().allowStaleReadsDuringClusterStateTransitions());
}
void
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 48d9145eec7..1a3a22d28a8 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -328,6 +328,7 @@ private:
DistributorHostInfoReporter _hostInfoReporter;
std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc;
bool _must_send_updated_host_info;
+ bool _use_btree_database;
};
} // distributor
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 6b476ae37c5..c48dad70357 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -35,7 +35,8 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
_operationGenerator(gen),
_rejectFeedBeforeTimeReached(), // At epoch
_non_main_thread_ops_mutex(),
- _non_main_thread_ops_owner(owner, getClock())
+ _non_main_thread_ops_owner(owner, getClock()),
+ _concurrent_gets_enabled(false)
{
}
@@ -290,8 +291,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
return true;
}
-IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
-{
+std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr<api::GetCommand>& cmd) {
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
auto& metrics = getMetrics().gets[cmd->getLoadType()];
auto snapshot = getDistributor().read_snapshot_for_bucket(bucket);
@@ -304,13 +304,18 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
bounce_with_wrong_distribution(*cmd, *snapshot.context().default_active_cluster_state());
metrics.failures.wrongdistributor.inc(); // TODO thread safety for updates
}
- return true;
+ return std::shared_ptr<Operation>();
}
// The snapshot is aware of whether stale reads are enabled, so we don't have to check that here.
const auto* space_repo = snapshot.bucket_space_repo();
assert(space_repo != nullptr);
- _op = std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()),
- snapshot.steal_read_guard(), cmd, metrics);
+ return std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()),
+ snapshot.steal_read_guard(), cmd, metrics);
+}
+
+IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
+{
+ _op = try_generate_get_operation(cmd);
return true;
}
@@ -345,4 +350,25 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor)
return true;
}
+bool ExternalOperationHandler::try_handle_message_outside_main_thread(const std::shared_ptr<api::StorageMessage>& msg) {
+ if (!concurrent_gets_enabled()) {
+ return false;
+ }
+ const auto type_id = msg->getType().getId();
+ if (type_id == api::MessageType::GET_ID) {
+ auto op = try_generate_get_operation(std::dynamic_pointer_cast<api::GetCommand>(msg));
+ if (op) {
+ std::lock_guard g(_non_main_thread_ops_mutex);
+ _non_main_thread_ops_owner.start(std::move(op), msg->getPriority());
+ }
+ return true;
+ } else if (type_id == api::MessageType::GET_REPLY_ID) {
+ std::lock_guard g(_non_main_thread_ops_mutex);
+ // The Get for which this reply was created may have been sent by someone outside
+ // the ExternalOperationHandler, such as TwoPhaseUpdateOperation. Pass it on if so.
+ return _non_main_thread_ops_owner.handleReply(std::dynamic_pointer_cast<api::StorageReply>(msg));
+ }
+ return false;
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index b64b4bc90cd..2c1a87267eb 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -7,6 +7,7 @@
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/distributor/distributorcomponent.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
+#include <atomic>
#include <chrono>
#include <mutex>
@@ -51,6 +52,17 @@ public:
_rejectFeedBeforeTimeReached = timePoint;
}
+ // Returns true iff message was handled and should not be processed further by the caller.
+ bool try_handle_message_outside_main_thread(const std::shared_ptr<api::StorageMessage>& msg);
+
+ void set_concurrent_gets_enabled(bool enabled) noexcept {
+ _concurrent_gets_enabled.store(enabled, std::memory_order_relaxed);
+ }
+
+ bool concurrent_gets_enabled() const noexcept {
+ return _concurrent_gets_enabled.load(std::memory_order_relaxed);
+ }
+
private:
const MaintenanceOperationGenerator& _operationGenerator;
OperationSequencer _mutationSequencer;
@@ -58,6 +70,7 @@ private:
TimePoint _rejectFeedBeforeTimeReached;
mutable std::mutex _non_main_thread_ops_mutex;
OperationOwner _non_main_thread_ops_owner;
+ std::atomic<bool> _concurrent_gets_enabled;
template <typename Func>
void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
@@ -72,6 +85,7 @@ private:
const lib::ClusterState& current_state,
const lib::ClusterState& pending_state);
void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result);
+ std::shared_ptr<Operation> try_generate_get_operation(const std::shared_ptr<api::GetCommand>&);
bool checkSafeTimeReached(api::StorageCommand& cmd);
api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime);
diff --git a/storage/src/vespa/storage/distributor/operationstarter.h b/storage/src/vespa/storage/distributor/operationstarter.h
index ee938d02226..75c794db7c7 100644
--- a/storage/src/vespa/storage/distributor/operationstarter.h
+++ b/storage/src/vespa/storage/distributor/operationstarter.h
@@ -11,7 +11,7 @@ class OperationStarter
{
public:
using Priority = uint8_t;
- virtual ~OperationStarter() {}
+ virtual ~OperationStarter() = default;
virtual bool start(const std::shared_ptr<Operation>& operation, Priority priority) = 0;
};