diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-02 16:19:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-02 16:19:03 +0100 |
commit | 4c87d180e880012b9ef932c8490a773509cfedd0 (patch) | |
tree | ff993a6ea2bf254e06d6875511aeae533c9b1d8b /storage | |
parent | cc09087f82931a6b00498128f88c30e8d024bd34 (diff) |
Revert "Properly track execution of BucketTasks and provide sync() and order… "
Diffstat (limited to 'storage')
10 files changed, 54 insertions, 161 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 5a7a4394da4..acccbb8b9b9 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -456,40 +456,6 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) { EXPECT_EQ(1, numInvocations); } -TEST_F(FileStorManagerTest, sync_waits_for_already_started_tasks) { - TestFileStorComponents c(*this); - - setClusterState("storage:3 distributor:3"); - EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); - - auto executor = getDummyPersistence().get_bucket_executor(); - ASSERT_TRUE(executor); - - spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); - - createBucket(b1.getBucketId()); - - std::atomic<size_t> numInvocations(0); - vespalib::Gate gate; - auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr<IDestructorCallback>) { - gate.await(); - numInvocations++; - })); - EXPECT_FALSE(response); - EXPECT_EQ(0, numInvocations); - std::atomic<bool> syncComplete(false); - std::thread thread([&syncComplete, &executor]() { - executor->sync(); - syncComplete = true; - }); - std::this_thread::sleep_for(100us); - EXPECT_FALSE(syncComplete); - gate.countDown(); - thread.join(); - EXPECT_TRUE(syncComplete); - EXPECT_EQ(1, numInvocations); -} - TEST_F(FileStorManagerTest, state_change) { TestFileStorComponents c(*this); diff --git a/storage/src/vespa/storage/bucketdb/bucketcopy.h b/storage/src/vespa/storage/bucketdb/bucketcopy.h index 3e93e4594a6..94a1e63e53e 100644 --- a/storage/src/vespa/storage/bucketdb/bucketcopy.h +++ b/storage/src/vespa/storage/bucketdb/bucketcopy.h @@ -28,9 +28,9 @@ public: { } - bool trusted() const noexcept { return _flags & TRUSTED; } + bool trusted() const { return _flags & TRUSTED; } - BucketCopy& setTrusted(bool val = true) noexcept { + BucketCopy& setTrusted(bool val = true) { if (!val) { clearTrusted(); } else { @@ -40,44 +40,46 @@ public: return *this; } - void clearTrusted() noexcept { _flags &= ~TRUSTED; } + void clearTrusted() { _flags &= ~TRUSTED; } - bool valid() const noexcept { return getBucketInfo().valid(); } - bool empty() const noexcept { return getBucketInfo().empty(); } - bool wasRecentlyCreated() const noexcept { + bool valid() const { return getBucketInfo().valid(); } + bool empty() const { return getBucketInfo().empty(); } + bool wasRecentlyCreated() const { return (getChecksum() == 1 && getDocumentCount() == 0 && getTotalDocumentSize() == 0); } - static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx) noexcept { + static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx) + { return BucketCopy(timestamp, nodeIdx, api::BucketInfo(1, 0, 0, 0, 0)); } - uint16_t getNode() const noexcept { return _node; } - uint64_t getTimestamp() const noexcept { return _timestamp; } + uint16_t getNode() const { return _node; } + uint64_t getTimestamp() const { return _timestamp; } - uint32_t getChecksum() const noexcept { return _info.getChecksum(); } - uint32_t getDocumentCount() const noexcept { return _info.getDocumentCount(); } - uint32_t getTotalDocumentSize() const noexcept { return _info.getTotalDocumentSize(); } - uint32_t getMetaCount() const noexcept { return _info.getMetaCount(); } - uint32_t getUsedFileSize() const noexcept { return _info.getUsedFileSize(); } - bool active() const noexcept { return _info.isActive(); } - bool ready() const noexcept { return _info.isReady(); } + uint32_t getChecksum() const { return _info.getChecksum(); } + uint32_t getDocumentCount() const { return _info.getDocumentCount(); } + uint32_t getTotalDocumentSize() const + { return _info.getTotalDocumentSize(); } + uint32_t getMetaCount() const { return _info.getMetaCount(); } + uint32_t getUsedFileSize() const { return _info.getUsedFileSize(); } + bool active() const { return _info.isActive(); } + bool ready() const { return _info.isReady(); } - const api::BucketInfo& getBucketInfo() const noexcept { return _info; } + const api::BucketInfo& getBucketInfo() const { return _info; } - void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) noexcept { + void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) { _info = bInfo; _timestamp = timestamp; } - void setActive(bool setactive) noexcept { + void setActive(bool setactive) { _info.setActive(setactive); } bool consistentWith(const BucketCopy& other, - bool countInvalidAsConsistent = false) const noexcept + bool countInvalidAsConsistent = false) const { // If both are valid, check checksum and doc count. if (valid() && other.valid()) { @@ -92,7 +94,7 @@ public: std::string toString() const; - bool operator==(const BucketCopy& other) const noexcept { + bool operator==(const BucketCopy& other) const { return getBucketInfo() == other.getBucketInfo() && _flags == other._flags; diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 2d70ee8d3ba..5be6f310c71 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -166,17 +166,17 @@ namespace { uint64_t active; uint64_t ready; - Count() noexcept : docs(0), bytes(0), buckets(0), active(0), ready(0) {} + Count() : docs(0), bytes(0), buckets(0), active(0), ready(0) {} }; Count count; uint32_t lowestUsedBit; - MetricsUpdater() noexcept + MetricsUpdater() : count(), lowestUsedBit(58) {} void operator()(document::BucketId::Type bucketId, - const StorBucketDatabase::Entry& data) noexcept + const StorBucketDatabase::Entry& data) { document::BucketId bucket( document::BucketId::keyToBucketId(bucketId)); @@ -198,7 +198,7 @@ namespace { } }; - void add(const MetricsUpdater& rhs) noexcept { + void add(const MetricsUpdater& rhs) { auto& d = count; auto& s = rhs.count; d.buckets += s.buckets; diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h index 785419e78cf..124ee1bdf45 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h @@ -8,7 +8,7 @@ namespace storage::distributor::dbtransition { struct Entry { Entry(const document::BucketId& bid, - const BucketCopy& copy_) noexcept + const BucketCopy& copy_) : bucket_key(bid.toKey()), copy(copy_) {} diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 13d83157150..51971a276b4 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -196,6 +196,18 @@ private: // to be present for that exact purpose. mutable std::mutex _lock; + /** + * Increment latency and operation count stats for the node the message + * was sent towards based on the registered send time and the current time. + * + * In the event that system time has moved backwards across sending a + * command and reciving its reply, the latency will not be recorded but + * the total number of messages will increase. + * + * _lock MUST be held upon invocation. + */ + void updateNodeStatsOnReply(const MessageEntry& entry); + void getStatusStartPage(std::ostream& out) const; void getStatusPerNode(std::ostream& out) const; void getStatusPerBucket(std::ostream& out) const; diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 7f8da8e76e7..b8ed6b8ec91 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -104,7 +104,7 @@ AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) con }); spi::Bucket bucket(cmd.getBucket()); auto onDone = std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)); - cmd.run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone))); + cmd.task().run(bucket, std::make_shared<vespalib::KeepAlive<decltype(onDone)>>(std::move(onDone))); return tracker; } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index babee2c6351..a46b4205570 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "filestormanager.h" #include "filestorhandlerimpl.h" +#include "filestormanager.h" #include <vespa/storage/bucketdb/minimumusedbitstracker.h> #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/content_bucket_space_repo.h> @@ -14,7 +14,6 @@ #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/persistence/persistencehandler.h> #include <vespa/storage/persistence/provider_error_wrapper.h> -#include <vespa/persistence/spi/bucket_tasks.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/state.h> #include <vespa/storageapi/message/persistence.h> @@ -24,8 +23,6 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> -#include <vespa/vespalib/util/count_down_latch.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <thread> #include <vespa/log/bufferedlogger.h> @@ -79,13 +76,6 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p _configFetcher(configUri.getContext()), _use_async_message_handling_on_schedule(false), _metrics(std::make_unique<FileStorMetrics>()), - _filestorHandler(), - _sequencedExecutor(), - _executeLock(), - _syncCond(), - _notifyAfterExecute(false), - _executeCount(0), - _tasksInExecute(), _closed(false), _lock(), _host_info_reporter(_component.getStateUpdater()), @@ -820,13 +810,6 @@ FileStorManager::sendUp(const std::shared_ptr<api::StorageMessage>& msg) void FileStorManager::onClose() { LOG(debug, "Start closing"); - std::unique_ptr<vespalib::IDestructorCallback> toDestruct; - { - std::lock_guard guard(_executeLock); - toDestruct = std::move(_bucketExecutorRegistration); - } - toDestruct.reset(); - _resource_usage_listener_registration.reset(); // Avoid getting config during shutdown _configFetcher.close(); LOG(debug, "Closed _configFetcher."); @@ -990,70 +973,18 @@ void FileStorManager::initialize_bucket_databases_from_provider() { _init_handler.notifyDoneInitializing(); } -class FileStorManager::TrackExecutedTasks : public vespalib::IDestructorCallback { -public: - TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager); - ~TrackExecutedTasks() override; -private: - FileStorManager & _manager; - size_t _serialNum; -}; - -FileStorManager::TrackExecutedTasks::TrackExecutedTasks(std::lock_guard<std::mutex> & guard, FileStorManager & manager) - : _manager(manager), - _serialNum(_manager._executeCount++) -{ - (void) guard; - _manager._tasksInExecute.insert(_serialNum); -} - -FileStorManager::TrackExecutedTasks::~TrackExecutedTasks() { - std::lock_guard guard(_manager._executeLock); - _manager._tasksInExecute.erase(_serialNum); - if (_manager._notifyAfterExecute) { - _manager._syncCond.notify_all(); - } -} - std::unique_ptr<spi::BucketTask> FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::execute")); if (entry.exist()) { - std::unique_ptr<TrackExecutedTasks> trackTasks; - { - std::lock_guard guard(_executeLock); - if (_bucketExecutorRegistration) { - trackTasks = std::make_unique<TrackExecutedTasks>(guard, *this); - } - } - if (trackTasks) { - _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(trackTasks), std::move(task))); - } + _filestorHandler->schedule(std::make_shared<RunTaskCommand>(bucket, std::move(task))); } return task; } -namespace { -bool -areTasksCompleteUntil(const vespalib::hash_set<size_t> &inFlight, size_t limit) { - for (size_t serial : inFlight) { - if (serial < limit) { - return false; - } - } - return true; -} -} - void FileStorManager::sync() { - std::unique_lock guard(_executeLock); - _notifyAfterExecute = true; - _syncCond.wait(guard, [this, limit=_executeCount]() { - return areTasksCompleteUntil(_tasksInExecute, limit); - }); - _notifyAfterExecute = false; } } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index fea1b9c7ea6..6eaef45e9bd 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -72,19 +72,13 @@ class FileStorManager : public StorageLinkQueued, std::shared_ptr<FileStorMetrics> _metrics; std::unique_ptr<FileStorHandler> _filestorHandler; std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor; - std::mutex _executeLock; - std::condition_variable _syncCond; - bool _notifyAfterExecute; - size_t _executeCount; - vespalib::hash_set<size_t> _tasksInExecute; - bool _closed; std::mutex _lock; std::unique_ptr<vespalib::IDestructorCallback> _bucketExecutorRegistration; ServiceLayerHostInfoReporter _host_info_reporter; std::unique_ptr<vespalib::IDestructorCallback> _resource_usage_listener_registration; - class TrackExecutedTasks; + public: FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&, ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&); diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index e7d8158fb13..7ccb3ee895d 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "messages.h" -#include <vespa/vespalib/util/idestructorcallback.h> #include <ostream> #include <cassert> @@ -184,26 +183,16 @@ RunTaskCommand::makeReply() { return std::make_unique<RunTaskReply>(*this); } -RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, - std::unique_ptr<vespalib::IDestructorCallback> afterRun, - std::unique_ptr<spi::BucketTask> task) +RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) : api::InternalCommand(ID), _task(std::move(task)), - _afterRun(std::move(afterRun)), _bucket(bucket) -{ } - -RunTaskCommand::~RunTaskCommand() = default; - -void -RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) { - if (_task) { - _task->run(bucket, std::move(onComplete)); - } - _afterRun.reset(); + assert(_task); } +RunTaskCommand::~RunTaskCommand() = default; + void RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { out << "RunTaskCommand(" << _bucket <<")"; diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index d26543c7797..043747d10d2 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -244,20 +244,19 @@ public: class RunTaskCommand : public api::InternalCommand { public: static constexpr uint32_t ID = 1011; - RunTaskCommand(const spi::Bucket &bucket, - std::unique_ptr<vespalib::IDestructorCallback> afterRun, - std::unique_ptr<spi::BucketTask> task); + RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task); ~RunTaskCommand(); document::Bucket getBucket() const override { return _bucket.getBucket(); } std::unique_ptr<api::StorageReply> makeReply() override; - void run(const spi::Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete); + spi::BucketTask & task() & { + return *_task; + } void print(std::ostream& out, bool verbose, const std::string& indent) const override; private: - std::unique_ptr<spi::BucketTask> _task; - std::unique_ptr<vespalib::IDestructorCallback> _afterRun; - spi::Bucket _bucket; + std::unique_ptr<spi::BucketTask> _task; + spi::Bucket _bucket; }; // Simple reply for matching the RunTaskCommand |