aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-28 15:01:26 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-28 15:01:26 +0000
commit4db268e909f05ab3003b4956f127ed21264cacb4 (patch)
tree79fddf0bdd55ef6466c238a9a50df28e34923e11 /storage
parent3f1c5a2c64cd24fd85f5af58b9a7bdd89dca80c5 (diff)
Fix ever-growing message tracker for concurrent Get operations
When Get requests initiated outside the main distributor thread are sent via the MessageSender that is implemented by the main Distributor instance, they would be implicitly registered with the pending message tracker. Not only was this thread unsafe, the registrations would never be cleared away since the reply pipeline bypassed it entirely. This would cause a silent memory leak that would build up many small allocations over time. We now dispatch Get requests directly through the storage link chain, bypassing the message tracking component. This both fixes the leak and avoids extra overhead for the Get requests. Note: the concurrent Get feature is _not_ enabled by default. Also fixes an issue where concurrent Get operations weren't correctly gracefully aborted when the node shuts down.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp35
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp1
-rw-r--r--storage/src/vespa/storage/common/messagesender.h4
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h4
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp34
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h4
8 files changed, 80 insertions, 21 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 94d33e50047..8fa8a6bcede 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -189,6 +189,8 @@ struct DistributorTest : Test, DistributorTestUtil {
void configure_mutation_sequencing(bool enabled);
void configure_merge_busy_inhibit_duration(int seconds);
void do_test_pending_merge_getnodestate_reply_edge(BucketSpace space);
+
+ void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled);
};
DistributorTest::DistributorTest()
@@ -1025,14 +1027,18 @@ TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled)
EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled());
}
-TEST_F(DistributorTest, gets_are_started_outside_main_distributor_logic_if_btree_db_and_stale_reads_enabled) {
+void DistributorTest::set_up_and_start_get_op_with_stale_reads_enabled(bool enabled) {
createLinks(true);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
- configure_stale_reads_enabled(true);
+ configure_stale_reads_enabled(enabled);
document::BucketId bucket(16, 1);
addNodesToBucketDB(bucket, "0=1/1/1/t");
_distributor->onDown(make_dummy_get_command_for_bucket_1());
+}
+
+TEST_F(DistributorTest, gets_are_started_outside_main_distributor_logic_if_btree_db_and_stale_reads_enabled) {
+ set_up_and_start_get_op_with_stale_reads_enabled(true);
ASSERT_THAT(_sender.commands(), SizeIs(1));
EXPECT_THAT(_sender.replies(), SizeIs(0));
@@ -1044,16 +1050,27 @@ TEST_F(DistributorTest, gets_are_started_outside_main_distributor_logic_if_btree
}
TEST_F(DistributorTest, gets_are_not_started_outside_main_distributor_logic_if_stale_reads_disabled) {
- createLinks(true);
- setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
- configure_stale_reads_enabled(false);
-
- document::BucketId bucket(16, 1);
- addNodesToBucketDB(bucket, "0=1/1/1/t");
- _distributor->onDown(make_dummy_get_command_for_bucket_1());
+ set_up_and_start_get_op_with_stale_reads_enabled(false);
// Get has been placed into distributor queue, so no external messages are produced.
EXPECT_THAT(_sender.commands(), SizeIs(0));
EXPECT_THAT(_sender.replies(), SizeIs(0));
}
+// There's no need or desire to track "lockfree" Gets in the main pending message tracker,
+// as we only have to track mutations to inhibit maintenance ops safely. Furthermore,
+// the message tracker is a multi-index and therefore has some runtime cost.
+TEST_F(DistributorTest, gets_started_outside_main_thread_are_not_tracked_by_main_pending_message_tracker) {
+ set_up_and_start_get_op_with_stale_reads_enabled(true);
+ Bucket bucket(FixedBucketSpaces::default_space(), BucketId(16, 1));
+ EXPECT_FALSE(_distributor->getPendingMessageTracker().hasPendingMessage(
+ 0, bucket, api::MessageType::GET_ID));
+}
+
+TEST_F(DistributorTest, closing_aborts_gets_started_outside_main_distributor_thread) {
+ set_up_and_start_get_op_with_stale_reads_enabled(true);
+ _distributor->close();
+ ASSERT_EQ(1, _sender.replies().size());
+ EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult());
+}
+
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 84f7d34d069..f2f949a6d56 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -532,7 +532,6 @@ TEST_F(ExternalOperationHandlerTest, gets_are_busy_bounced_during_transition_per
makeGetCommandForUser(non_owned_bucket.withoutCountBits())));
EXPECT_EQ("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)",
_sender.reply(0)->getResult().toString());
-
}
// TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with
diff --git a/storage/src/vespa/storage/common/messagesender.h b/storage/src/vespa/storage/common/messagesender.h
index 659fccad412..2f24b750d66 100644
--- a/storage/src/vespa/storage/common/messagesender.h
+++ b/storage/src/vespa/storage/common/messagesender.h
@@ -27,7 +27,7 @@ namespace storage::api {
namespace storage {
struct MessageSender {
- virtual ~MessageSender() {}
+ virtual ~MessageSender() = default;
virtual void sendCommand(const std::shared_ptr<api::StorageCommand>&) = 0;
virtual void sendReply(const std::shared_ptr<api::StorageReply>&) = 0;
@@ -36,7 +36,7 @@ struct MessageSender {
};
struct ChainedMessageSender {
- virtual ~ChainedMessageSender() {}
+ virtual ~ChainedMessageSender() = default;
virtual void sendUp(const std::shared_ptr<api::StorageMessage>&) = 0;
virtual void sendDown(const std::shared_ptr<api::StorageMessage>&) = 0;
};
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index 97b357a3bf7..f73eb3ea36d 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -173,7 +173,7 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
ost << vespalib::getStackTrace(0);
if (!msg->getType().isReply()) {
LOGBP(warning, "%s", ost.str().c_str());
- StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
+ auto& cmd = static_cast<StorageCommand&>(*msg);
shared_ptr<StorageReply> reply(cmd.makeReply().release());
if (reply.get()) {
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index d903db4f6ed..5c6773ea5bf 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -217,15 +217,13 @@ void Distributor::onClose() {
LOG(debug, "Distributor::onClose invoked");
_bucketDBUpdater.flush();
+ _externalOperationHandler.close_pending();
_operationOwner.onClose();
_maintenanceOperationOwner.onClose();
}
-void
-Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
-{
- _pendingMessageTracker.insert(msg);
- if (_messageSender != 0) {
+void Distributor::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>& msg) {
+ if (_messageSender) {
_messageSender->sendUp(msg);
} else {
StorageLink::sendUp(msg);
@@ -233,9 +231,16 @@ Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
}
void
+Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _pendingMessageTracker.insert(msg);
+ send_up_without_tracking(msg);
+}
+
+void
Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg)
{
- if (_messageSender != 0) {
+ if (_messageSender) {
_messageSender->sendDown(msg);
} else {
StorageLink::sendDown(msg);
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 9fd055023e2..ac6e306a4fb 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -55,13 +55,15 @@ public:
HostInfo& hostInfoReporterRegistrar,
ChainedMessageSender* = nullptr);
- ~Distributor();
+ ~Distributor() override;
void onOpen() override;
void onClose() override;
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
void sendDown(const std::shared_ptr<api::StorageMessage>&) override;
+ // Bypasses message tracker component. Thread safe.
+ void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&);
ChainedMessageSender& getMessageSender() override {
return (_messageSender == 0 ? *this : *_messageSender);
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index ecf02ad3c30..adeab7ba132 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -26,16 +26,42 @@ LOG_SETUP(".distributor.manager");
namespace storage::distributor {
+class DirectDispatchSender : public DistributorMessageSender {
+ Distributor& _distributor;
+public:
+ explicit DirectDispatchSender(Distributor& distributor)
+ : _distributor(distributor)
+ {}
+ ~DirectDispatchSender() override = default;
+
+ void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override {
+ _distributor.send_up_without_tracking(cmd);
+ }
+ void sendReply(const std::shared_ptr<api::StorageReply>& reply) override {
+ _distributor.send_up_without_tracking(reply);
+ }
+ int getDistributorIndex() const override {
+ return _distributor.getDistributorIndex(); // Thread safe
+ }
+ const std::string& getClusterName() const override {
+ return _distributor.getClusterName(); // Thread safe
+ }
+ const PendingMessageTracker& getPendingMessageTracker() const override {
+ abort(); // Never called by the messages using this component.
+ }
+};
+
ExternalOperationHandler::ExternalOperationHandler(Distributor& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const MaintenanceOperationGenerator& gen,
DistributorComponentRegister& compReg)
: DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"),
+ _direct_dispatch_sender(std::make_unique<DirectDispatchSender>(owner)),
_operationGenerator(gen),
_rejectFeedBeforeTimeReached(), // At epoch
_non_main_thread_ops_mutex(),
- _non_main_thread_ops_owner(owner, getClock()),
+ _non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()),
_concurrent_gets_enabled(false)
{
}
@@ -51,6 +77,12 @@ ExternalOperationHandler::handleMessage(const std::shared_ptr<api::StorageMessag
return retVal;
}
+void ExternalOperationHandler::close_pending() {
+ std::lock_guard g(_non_main_thread_ops_mutex);
+ // Make sure we drain any pending operations upon close.
+ _non_main_thread_ops_owner.onClose();
+}
+
api::ReturnCode
ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime)
{
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index 2c1a87267eb..e38f6792717 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -20,6 +20,7 @@ namespace distributor {
class Distributor;
class MaintenanceOperationGenerator;
+class DirectDispatchSender;
class ExternalOperationHandler : public DistributorComponent,
public api::MessageHandler
@@ -55,6 +56,8 @@ public:
// Returns true iff message was handled and should not be processed further by the caller.
bool try_handle_message_outside_main_thread(const std::shared_ptr<api::StorageMessage>& msg);
+ void close_pending();
+
void set_concurrent_gets_enabled(bool enabled) noexcept {
_concurrent_gets_enabled.store(enabled, std::memory_order_relaxed);
}
@@ -64,6 +67,7 @@ public:
}
private:
+ std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender;
const MaintenanceOperationGenerator& _operationGenerator;
OperationSequencer _mutationSequencer;
Operation::SP _op;