From cad7af3ec2d6fd8e1d70c3575088d0edab4bbe41 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 2 Feb 2021 05:31:48 +0000 Subject: Properly track execution of BucketTasks and provide sync() and orderly shutdown. --- document/src/vespa/document/bucket/bucket.h | 2 +- document/src/vespa/document/bucket/bucketid.cpp | 8 +-- document/src/vespa/document/bucket/bucketid.h | 42 ++++++------- fastos/src/vespa/fastos/ringbuffer.h | 17 ----- persistence/src/vespa/persistence/spi/bucket.h | 8 +-- .../filestorage/filestormanagertest.cpp | 34 ++++++++++ storage/src/vespa/storage/bucketdb/bucketcopy.h | 44 +++++++------ .../src/vespa/storage/bucketdb/bucketmanager.cpp | 8 +-- .../pending_bucket_space_db_transition_entry.h | 2 +- .../storage/distributor/pendingmessagetracker.h | 12 ---- .../src/vespa/storage/persistence/asynchandler.cpp | 2 +- .../persistence/filestorage/filestormanager.cpp | 73 +++++++++++++++++++++- .../persistence/filestorage/filestormanager.h | 8 ++- storage/src/vespa/storage/persistence/messages.cpp | 21 +++++-- storage/src/vespa/storage/persistence/messages.h | 13 ++-- .../src/vespa/storageapi/buckets/bucketinfo.cpp | 5 +- .../src/vespa/storageapi/buckets/bucketinfo.h | 40 ++++++------ vespalib/src/vespa/vespalib/util/executor.h | 4 +- 18 files changed, 217 insertions(+), 126 deletions(-) diff --git a/document/src/vespa/document/bucket/bucket.h b/document/src/vespa/document/bucket/bucket.h index 44068e1c443..f189c2951c9 100644 --- a/document/src/vespa/document/bucket/bucket.h +++ b/document/src/vespa/document/bucket/bucket.h @@ -32,7 +32,7 @@ public: vespalib::string toString() const; struct hash { - size_t operator () (const Bucket& b) const { + size_t operator () (const Bucket& b) const noexcept { size_t hash1 = BucketId::hash()(b.getBucketId()); size_t hash2 = BucketSpace::hash()(b.getBucketSpace()); // Formula taken from std::hash_combine proposal diff --git a/document/src/vespa/document/bucket/bucketid.cpp b/document/src/vespa/document/bucket/bucketid.cpp index 668798d6c39..1b9cf1e5304 100644 --- a/document/src/vespa/document/bucket/bucketid.cpp +++ b/document/src/vespa/document/bucket/bucketid.cpp @@ -71,7 +71,7 @@ Initialize _initializeUsedMasks; } -void BucketId::initialize() { +void BucketId::initialize() noexcept { fillUsedMasks(BucketId::_usedMasks, BucketId::maxNumBits); fillStripMasks(BucketId::_stripMasks, BucketId::maxNumBits); } @@ -91,7 +91,7 @@ void BucketId::throwFailedSetUsedBits(uint32_t used, uint32_t availBits) { } BucketId::Type -BucketId::reverse(Type id) +BucketId::reverse(Type id) noexcept { id = ((id & 0x5555555555555555l) << 1) | ((id & 0xaaaaaaaaaaaaaaaal) >> 1); id = ((id & 0x3333333333333333l) << 2) | ((id & 0xccccccccccccccccl) >> 2); @@ -100,7 +100,7 @@ BucketId::reverse(Type id) } BucketId::Type -BucketId::keyToBucketId(Type key) +BucketId::keyToBucketId(Type key) noexcept { Type retVal = reverse(key); @@ -113,7 +113,7 @@ BucketId::keyToBucketId(Type key) } bool -BucketId::contains(const BucketId& id) const +BucketId::contains(const BucketId& id) const noexcept { if (id.getUsedBits() < getUsedBits()) { return false; diff --git a/document/src/vespa/document/bucket/bucketid.h b/document/src/vespa/document/bucket/bucketid.h index b31f9080acc..675a0d23ebd 100644 --- a/document/src/vespa/document/bucket/bucketid.h +++ b/document/src/vespa/document/bucket/bucketid.h @@ -37,7 +37,7 @@ class BucketId { public: struct hash { - size_t operator () (const BucketId& g) const { + size_t operator () (const BucketId& g) const noexcept { return g.getId(); } }; @@ -55,23 +55,23 @@ public: /** Create a bucket id using a set of bits from a raw unchecked value. */ BucketId(uint32_t useBits, Type id) noexcept : _id(createUsedBits(useBits, id)) { } - bool operator<(const BucketId& id) const { + bool operator<(const BucketId& id) const noexcept { return getId() < id.getId(); } - bool operator==(const BucketId& id) const { return getId() == id.getId(); } - bool operator!=(const BucketId& id) const { return getId() != id.getId(); } + bool operator==(const BucketId& id) const noexcept { return getId() == id.getId(); } + bool operator!=(const BucketId& id) const noexcept { return getId() != id.getId(); } vespalib::string toString() const; - bool valid() const { + bool valid() const noexcept { return validUsedBits(getUsedBits()); } - static bool validUsedBits(uint32_t usedBits) { + static bool validUsedBits(uint32_t usedBits) noexcept { return (usedBits >= minNumBits) && (usedBits <= maxNumBits); } - bool isSet() const { + bool isSet() const noexcept { return _id != 0u; } /** @@ -79,14 +79,14 @@ public: * verify that two different documents belong to the same bucket given some * level of bucket splitting, use this to ignore the unused bits. */ - BucketId stripUnused() const { return BucketId(getUsedBits(), getId()); } + BucketId stripUnused() const noexcept { return BucketId(getUsedBits(), getId()); } /** * Checks whether the given bucket is contained within this bucket. That is, * if it is the same bucket, or if it is a bucket using more bits, which is * identical to this one if set to use as many bits as this one. */ - bool contains(const BucketId& id) const; + bool contains(const BucketId& id) const noexcept; // Functions exposing internals we want to make users independent of @@ -97,7 +97,7 @@ public: static constexpr uint32_t maxNumBits = (8 * sizeof(Type) - CountBits); static constexpr uint32_t minNumBits = 1u; // See comment above. - uint32_t getUsedBits() const { return _id >> maxNumBits; } + uint32_t getUsedBits() const noexcept { return _id >> maxNumBits; } void setUsedBits(uint32_t used) { uint32_t availBits = maxNumBits; @@ -113,22 +113,22 @@ public: } /** Get the bucket id value stripped of the bits that are not in use. */ - Type getId() const { return (_id & getStripMask()); } + Type getId() const noexcept { return (_id & getStripMask()); } /** * Get the bucket id value stripped of the count bits plus the bits that * are not in use. */ - Type withoutCountBits() const { return (_id & getUsedMask()); } + Type withoutCountBits() const noexcept { return (_id & getUsedMask()); } - Type getRawId() const { return _id; } + Type getRawId() const noexcept { return _id; } /** * Reverses the bits in the given number, except the countbits part. * Used for sorting in the bucket database as we want related buckets * to be sorted next to each other. */ - static Type bucketIdToKey(Type id) { + static Type bucketIdToKey(Type id) noexcept { Type retVal = reverse(id); Type usedCountLSB = id >> maxNumBits; @@ -139,39 +139,39 @@ public: return retVal; } - static Type keyToBucketId(Type key); + static Type keyToBucketId(Type key) noexcept ; /** * Reverses the bucket id bitwise, except the countbits part, * and returns the value, */ - Type toKey() const { return bucketIdToKey(getId()); }; + Type toKey() const noexcept { return bucketIdToKey(getId()); }; /** * Reverses the order of the bits in the bucket id. */ - static Type reverse(Type id); + static Type reverse(Type id) noexcept; /** * Returns the value of the Nth bit, counted in the reverse order of the * bucket id. */ - uint8_t getBit(uint32_t n) const { + uint8_t getBit(uint32_t n) const noexcept { return (_id & ((Type)1 << n)) == 0 ? 0 : 1; } - static void initialize(); + static void initialize() noexcept; private: static Type _usedMasks[maxNumBits+1]; static Type _stripMasks[maxNumBits+1]; Type _id; - Type getUsedMask() const { + Type getUsedMask() const noexcept { return _usedMasks[getUsedBits()]; } - Type getStripMask() const { + Type getStripMask() const noexcept { return _stripMasks[getUsedBits()]; } diff --git a/fastos/src/vespa/fastos/ringbuffer.h b/fastos/src/vespa/fastos/ringbuffer.h index 41c0af7385b..89b1bb84c9c 100644 --- a/fastos/src/vespa/fastos/ringbuffer.h +++ b/fastos/src/vespa/fastos/ringbuffer.h @@ -42,18 +42,6 @@ public: _closed = false; } - FastOS_RingBufferData *GetData () { return _data; } - - void RepositionDataAt0 () - { - uint8_t *src = &_data->_buffer[_dataIndex]; - uint8_t *dst = _data->_buffer; - - for(int i=0; i<_dataSize; i++) - *dst++ = *src++; - _dataIndex = 0; - } - FastOS_RingBuffer (int bufferSize) : _closed(false), _data(0), @@ -93,11 +81,6 @@ public: _dataSize += bytes; } - int GetDataSize () - { - return _dataSize; - } - int GetWriteSpace () { int spaceLeft = _bufferSize - _dataSize; diff --git a/persistence/src/vespa/persistence/spi/bucket.h b/persistence/src/vespa/persistence/spi/bucket.h index 30393109cc9..507cc80ad76 100644 --- a/persistence/src/vespa/persistence/spi/bucket.h +++ b/persistence/src/vespa/persistence/spi/bucket.h @@ -21,12 +21,12 @@ public: explicit Bucket(const document::Bucket& b) noexcept : _bucket(b) {} - const document::Bucket &getBucket() const { return _bucket; } - document::BucketId getBucketId() const { return _bucket.getBucketId(); } - document::BucketSpace getBucketSpace() const { return _bucket.getBucketSpace(); } + const document::Bucket &getBucket() const noexcept { return _bucket; } + document::BucketId getBucketId() const noexcept { return _bucket.getBucketId(); } + document::BucketSpace getBucketSpace() const noexcept { return _bucket.getBucketSpace(); } /** Convert easily to a document bucket id to make class easy to use. */ - operator document::BucketId() const { return _bucket.getBucketId(); } + operator document::BucketId() const noexcept { return _bucket.getBucketId(); } bool operator==(const Bucket& o) const noexcept { return (_bucket == o._bucket); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index acccbb8b9b9..5a7a4394da4 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -456,6 +456,40 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) { EXPECT_EQ(1, numInvocations); } +TEST_F(FileStorManagerTest, sync_waits_for_already_started_tasks) { + TestFileStorComponents c(*this); + + setClusterState("storage:3 distributor:3"); + EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); + + auto executor = getDummyPersistence().get_bucket_executor(); + ASSERT_TRUE(executor); + + spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); + + createBucket(b1.getBucketId()); + + std::atomic numInvocations(0); + vespalib::Gate gate; + auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations, &gate](const spi::Bucket &, std::shared_ptr) { + gate.await(); + numInvocations++; + })); + EXPECT_FALSE(response); + EXPECT_EQ(0, numInvocations); + std::atomic syncComplete(false); + std::thread thread([&syncComplete, &executor]() { + executor->sync(); + syncComplete = true; + }); + std::this_thread::sleep_for(100us); + EXPECT_FALSE(syncComplete); + gate.countDown(); + thread.join(); + EXPECT_TRUE(syncComplete); + EXPECT_EQ(1, numInvocations); +} + TEST_F(FileStorManagerTest, state_change) { TestFileStorComponents c(*this); diff --git a/storage/src/vespa/storage/bucketdb/bucketcopy.h b/storage/src/vespa/storage/bucketdb/bucketcopy.h index 94a1e63e53e..3e93e4594a6 100644 --- a/storage/src/vespa/storage/bucketdb/bucketcopy.h +++ b/storage/src/vespa/storage/bucketdb/bucketcopy.h @@ -28,9 +28,9 @@ public: { } - bool trusted() const { return _flags & TRUSTED; } + bool trusted() const noexcept { return _flags & TRUSTED; } - BucketCopy& setTrusted(bool val = true) { + BucketCopy& setTrusted(bool val = true) noexcept { if (!val) { clearTrusted(); } else { @@ -40,46 +40,44 @@ public: return *this; } - void clearTrusted() { _flags &= ~TRUSTED; } + void clearTrusted() noexcept { _flags &= ~TRUSTED; } - bool valid() const { return getBucketInfo().valid(); } - bool empty() const { return getBucketInfo().empty(); } - bool wasRecentlyCreated() const { + bool valid() const noexcept { return getBucketInfo().valid(); } + bool empty() const noexcept { return getBucketInfo().empty(); } + bool wasRecentlyCreated() const noexcept { return (getChecksum() == 1 && getDocumentCount() == 0 && getTotalDocumentSize() == 0); } - static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx) - { + static BucketCopy recentlyCreatedCopy(uint64_t timestamp, uint16_t nodeIdx) noexcept { return BucketCopy(timestamp, nodeIdx, api::BucketInfo(1, 0, 0, 0, 0)); } - uint16_t getNode() const { return _node; } - uint64_t getTimestamp() const { return _timestamp; } + uint16_t getNode() const noexcept { return _node; } + uint64_t getTimestamp() const noexcept { return _timestamp; } - uint32_t getChecksum() const { return _info.getChecksum(); } - uint32_t getDocumentCount() const { return _info.getDocumentCount(); } - uint32_t getTotalDocumentSize() const - { return _info.getTotalDocumentSize(); } - uint32_t getMetaCount() const { return _info.getMetaCount(); } - uint32_t getUsedFileSize() const { return _info.getUsedFileSize(); } - bool active() const { return _info.isActive(); } - bool ready() const { return _info.isReady(); } + uint32_t getChecksum() const noexcept { return _info.getChecksum(); } + uint32_t getDocumentCount() const noexcept { return _info.getDocumentCount(); } + uint32_t getTotalDocumentSize() const noexcept { return _info.getTotalDocumentSize(); } + uint32_t getMetaCount() const noexcept { return _info.getMetaCount(); } + uint32_t getUsedFileSize() const noexcept { return _info.getUsedFileSize(); } + bool active() const noexcept { return _info.isActive(); } + bool ready() const noexcept { return _info.isReady(); } - const api::BucketInfo& getBucketInfo() const { return _info; } + const api::BucketInfo& getBucketInfo() const noexcept { return _info; } - void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) { + void setBucketInfo(uint64_t timestamp, const api::BucketInfo& bInfo) noexcept { _info = bInfo; _timestamp = timestamp; } - void setActive(bool setactive) { + void setActive(bool setactive) noexcept { _info.setActive(setactive); } bool consistentWith(const BucketCopy& other, - bool countInvalidAsConsistent = false) const + bool countInvalidAsConsistent = false) const noexcept { // If both are valid, check checksum and doc count. if (valid() && other.valid()) { @@ -94,7 +92,7 @@ public: std::string toString() const; - bool operator==(const BucketCopy& other) const { + bool operator==(const BucketCopy& other) const noexcept { return getBucketInfo() == other.getBucketInfo() && _flags == other._flags; diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 5be6f310c71..2d70ee8d3ba 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -166,17 +166,17 @@ namespace { uint64_t active; uint64_t ready; - Count() : docs(0), bytes(0), buckets(0), active(0), ready(0) {} + Count() noexcept : docs(0), bytes(0), buckets(0), active(0), ready(0) {} }; Count count; uint32_t lowestUsedBit; - MetricsUpdater() + MetricsUpdater() noexcept : count(), lowestUsedBit(58) {} void operator()(document::BucketId::Type bucketId, - const StorBucketDatabase::Entry& data) + const StorBucketDatabase::Entry& data) noexcept { document::BucketId bucket( document::BucketId::keyToBucketId(bucketId)); @@ -198,7 +198,7 @@ namespace { } }; - void add(const MetricsUpdater& rhs) { + void add(const MetricsUpdater& rhs) noexcept { auto& d = count; auto& s = rhs.count; d.buckets += s.buckets; diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h index 124ee1bdf45..785419e78cf 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h @@ -8,7 +8,7 @@ namespace storage::distributor::dbtransition { struct Entry { Entry(const document::BucketId& bid, - const BucketCopy& copy_) + const BucketCopy& copy_) noexcept : bucket_key(bid.toKey()), copy(copy_) {} diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 51971a276b4..13d83157150 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -196,18 +196,6 @@ private: // to be present for that exact purpose. mutable std::mutex _lock; - /** - * Increment latency and operation count stats for the node the message - * was sent towards based on the registered send time and the current time. - * - * In the event that system time has moved backwards across sending a - * command and reciving its reply, the latency will not be recorded but - * the total number of messages will increase. - * - * _lock MUST be held upon invocation. - */ - void updateNodeStatsOnReply(const MessageEntry& entry); - void getStatusStartPage(std::ostream& out) const; void getStatusPerNode(std::ostream& out) const; void getStatusPerBucket(std::ostream& out) const; diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index b8ed6b8ec91..7f8da8e76e7 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -104,7 +104,7 @@ AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) con }); spi::Bucket bucket(cmd.getBucket()); auto onDone = std::make_unique(_sequencedExecutor, cmd.getBucketId(), std::move(task)); - cmd.task().run(bucket, std::make_shared>(std::move(onDone))); + cmd.run(bucket, std::make_shared>(std::move(onDone))); return tracker; } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index a46b4205570..babee2c6351 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "filestorhandlerimpl.h" #include "filestormanager.h" +#include "filestorhandlerimpl.h" #include #include #include @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +24,8 @@ #include #include #include +#include +#include #include #include @@ -76,6 +79,13 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p _configFetcher(configUri.getContext()), _use_async_message_handling_on_schedule(false), _metrics(std::make_unique()), + _filestorHandler(), + _sequencedExecutor(), + _executeLock(), + _syncCond(), + _notifyAfterExecute(false), + _executeCount(0), + _tasksInExecute(), _closed(false), _lock(), _host_info_reporter(_component.getStateUpdater()), @@ -810,6 +820,13 @@ FileStorManager::sendUp(const std::shared_ptr& msg) void FileStorManager::onClose() { LOG(debug, "Start closing"); + std::unique_ptr toDestruct; + { + std::lock_guard guard(_executeLock); + toDestruct = std::move(_bucketExecutorRegistration); + } + toDestruct.reset(); + _resource_usage_listener_registration.reset(); // Avoid getting config during shutdown _configFetcher.close(); LOG(debug, "Closed _configFetcher."); @@ -973,18 +990,70 @@ void FileStorManager::initialize_bucket_databases_from_provider() { _init_handler.notifyDoneInitializing(); } +class FileStorManager::TrackExecutedTasks : public vespalib::IDestructorCallback { +public: + TrackExecutedTasks(std::lock_guard & guard, FileStorManager & manager); + ~TrackExecutedTasks() override; +private: + FileStorManager & _manager; + size_t _serialNum; +}; + +FileStorManager::TrackExecutedTasks::TrackExecutedTasks(std::lock_guard & guard, FileStorManager & manager) + : _manager(manager), + _serialNum(_manager._executeCount++) +{ + (void) guard; + _manager._tasksInExecute.insert(_serialNum); +} + +FileStorManager::TrackExecutedTasks::~TrackExecutedTasks() { + std::lock_guard guard(_manager._executeLock); + _manager._tasksInExecute.erase(_serialNum); + if (_manager._notifyAfterExecute) { + _manager._syncCond.notify_all(); + } +} + std::unique_ptr FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr task) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::execute")); if (entry.exist()) { - _filestorHandler->schedule(std::make_shared(bucket, std::move(task))); + std::unique_ptr trackTasks; + { + std::lock_guard guard(_executeLock); + if (_bucketExecutorRegistration) { + trackTasks = std::make_unique(guard, *this); + } + } + if (trackTasks) { + _filestorHandler->schedule(std::make_shared(bucket, std::move(trackTasks), std::move(task))); + } } return task; } +namespace { +bool +areTasksCompleteUntil(const vespalib::hash_set &inFlight, size_t limit) { + for (size_t serial : inFlight) { + if (serial < limit) { + return false; + } + } + return true; +} +} + void FileStorManager::sync() { + std::unique_lock guard(_executeLock); + _notifyAfterExecute = true; + _syncCond.wait(guard, [this, limit=_executeCount]() { + return areTasksCompleteUntil(_tasksInExecute, limit); + }); + _notifyAfterExecute = false; } } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 6eaef45e9bd..fea1b9c7ea6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -72,13 +72,19 @@ class FileStorManager : public StorageLinkQueued, std::shared_ptr _metrics; std::unique_ptr _filestorHandler; std::unique_ptr _sequencedExecutor; + std::mutex _executeLock; + std::condition_variable _syncCond; + bool _notifyAfterExecute; + size_t _executeCount; + vespalib::hash_set _tasksInExecute; + bool _closed; std::mutex _lock; std::unique_ptr _bucketExecutorRegistration; ServiceLayerHostInfoReporter _host_info_reporter; std::unique_ptr _resource_usage_listener_registration; - + class TrackExecutedTasks; public: FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&, ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&); diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 7ccb3ee895d..0c7910011c8 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "messages.h" +#include #include #include @@ -183,16 +184,28 @@ RunTaskCommand::makeReply() { return std::make_unique(*this); } -RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr task) +RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, + std::unique_ptr afterRun, + std::unique_ptr task) : api::InternalCommand(ID), _task(std::move(task)), + _afterRun(std::move(afterRun)), _bucket(bucket) -{ - assert(_task); -} +{ } RunTaskCommand::~RunTaskCommand() = default; +void +RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptr onComplete) +{ + if (_task) { + _task->run(bucket, std::move(onComplete)); + } + if (_afterRun) { + _afterRun.reset(); + } +} + void RunTaskCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { out << "RunTaskCommand(" << _bucket <<")"; diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index 043747d10d2..d26543c7797 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -244,19 +244,20 @@ public: class RunTaskCommand : public api::InternalCommand { public: static constexpr uint32_t ID = 1011; - RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr task); + RunTaskCommand(const spi::Bucket &bucket, + std::unique_ptr afterRun, + std::unique_ptr task); ~RunTaskCommand(); document::Bucket getBucket() const override { return _bucket.getBucket(); } std::unique_ptr makeReply() override; - spi::BucketTask & task() & { - return *_task; - } + void run(const spi::Bucket & bucket, std::shared_ptr onComplete); void print(std::ostream& out, bool verbose, const std::string& indent) const override; private: - std::unique_ptr _task; - spi::Bucket _bucket; + std::unique_ptr _task; + std::unique_ptr _afterRun; + spi::Bucket _bucket; }; // Simple reply for matching the RunTaskCommand diff --git a/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp b/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp index 9e4e48d67f4..4c3b290d4d7 100644 --- a/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp +++ b/storageapi/src/vespa/storageapi/buckets/bucketinfo.cpp @@ -19,8 +19,7 @@ BucketInfo::BucketInfo() noexcept _active(false) {} -BucketInfo::BucketInfo(uint32_t checksum, uint32_t docCount, - uint32_t totDocSize) noexcept +BucketInfo::BucketInfo(uint32_t checksum, uint32_t docCount, uint32_t totDocSize) noexcept : _lastModified(0), _checksum(checksum), _docCount(docCount), @@ -73,7 +72,7 @@ BucketInfo::BucketInfo(uint32_t checksum, uint32_t docCount, {} bool -BucketInfo::operator==(const BucketInfo& info) const +BucketInfo::operator==(const BucketInfo& info) const noexcept { return (_checksum == info._checksum && _docCount == info._docCount && diff --git a/storageapi/src/vespa/storageapi/buckets/bucketinfo.h b/storageapi/src/vespa/storageapi/buckets/bucketinfo.h index 5b2de4b4d61..7e6e7a2aed2 100644 --- a/storageapi/src/vespa/storageapi/buckets/bucketinfo.h +++ b/storageapi/src/vespa/storageapi/buckets/bucketinfo.h @@ -42,37 +42,37 @@ public: uint32_t metaCount, uint32_t usedFileSize, bool ready, bool active, Timestamp lastModified) noexcept; - Timestamp getLastModified() const { return _lastModified; } - uint32_t getChecksum() const { return _checksum; } - uint32_t getDocumentCount() const { return _docCount; } - uint32_t getTotalDocumentSize() const { return _totDocSize; } - uint32_t getMetaCount() const { return _metaCount; } - uint32_t getUsedFileSize() const { return _usedFileSize; } - bool isReady() const { return _ready; } - bool isActive() const { return _active; } + Timestamp getLastModified() const noexcept { return _lastModified; } + uint32_t getChecksum() const noexcept { return _checksum; } + uint32_t getDocumentCount() const noexcept { return _docCount; } + uint32_t getTotalDocumentSize() const noexcept { return _totDocSize; } + uint32_t getMetaCount() const noexcept { return _metaCount; } + uint32_t getUsedFileSize() const noexcept { return _usedFileSize; } + bool isReady() const noexcept { return _ready; } + bool isActive() const noexcept { return _active; } - void setChecksum(uint32_t crc) { _checksum = crc; } - void setDocumentCount(uint32_t count) { _docCount = count; } - void setTotalDocumentSize(uint32_t size) { _totDocSize = size; } - void setMetaCount(uint32_t count) { _metaCount = count; } - void setUsedFileSize(uint32_t size) { _usedFileSize = size; } - void setReady(bool ready = true) { _ready = ready; } - void setActive(bool active = true) { _active = active; } - void setLastModified(Timestamp lastModified) { _lastModified = lastModified; } + void setChecksum(uint32_t crc) noexcept { _checksum = crc; } + void setDocumentCount(uint32_t count) noexcept { _docCount = count; } + void setTotalDocumentSize(uint32_t size) noexcept { _totDocSize = size; } + void setMetaCount(uint32_t count) noexcept { _metaCount = count; } + void setUsedFileSize(uint32_t size) noexcept { _usedFileSize = size; } + void setReady(bool ready = true) noexcept { _ready = ready; } + void setActive(bool active = true) noexcept { _active = active; } + void setLastModified(Timestamp lastModified) noexcept { _lastModified = lastModified; } /** * Only compare checksum, total document count and document * size, not meta count or used file size. */ - bool equalDocumentInfo(const BucketInfo& other) const { + bool equalDocumentInfo(const BucketInfo& other) const noexcept { return (_checksum == other._checksum && _docCount == other._docCount && _totDocSize == other._totDocSize); } - bool operator==(const BucketInfo& info) const; - bool valid() const { return (_docCount > 0 || _totDocSize == 0); } - bool empty() const { + bool operator==(const BucketInfo& info) const noexcept; + bool valid() const noexcept { return (_docCount > 0 || _totDocSize == 0); } + bool empty() const noexcept { return _metaCount == 0 && _usedFileSize == 0 && _checksum == 0; } vespalib::string toString() const; diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h index 57ad28344b9..ce610f4e84c 100644 --- a/vespalib/src/vespa/vespalib/util/executor.h +++ b/vespalib/src/vespa/vespalib/util/executor.h @@ -20,7 +20,7 @@ public: struct Task { typedef std::unique_ptr UP; virtual void run() = 0; - virtual ~Task() {} + virtual ~Task() = default; }; enum class OptimizeFor {LATENCY, THROUGHPUT, ADAPTIVE}; @@ -41,7 +41,7 @@ public: * In case you have a lazy executor that naps inbetween. **/ virtual void wakeup() = 0; - virtual ~Executor() =default; + virtual ~Executor() = default; }; } // namespace vespalib -- cgit v1.2.3 From 74091edadda3317efd1b5e41f0c5fb06cf41018d Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 2 Feb 2021 05:46:18 +0000 Subject: No need for a guard --- storage/src/vespa/storage/persistence/messages.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 0c7910011c8..e7d8158fb13 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -201,9 +201,7 @@ RunTaskCommand::run(const spi::Bucket & bucket, std::shared_ptrrun(bucket, std::move(onComplete)); } - if (_afterRun) { - _afterRun.reset(); - } + _afterRun.reset(); } void -- cgit v1.2.3