diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-06 17:20:25 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-02-06 17:51:42 +0000 |
commit | cabef01a63a222c6805521c188f24f57e9296152 (patch) | |
tree | 538485877d0f265f47a97b34419adef032fc09d3 /storage | |
parent | 1a37850993a673e79182eb62220d063878d48410 (diff) |
General code healt, nodiscard, range loops etc
Diffstat (limited to 'storage')
10 files changed, 100 insertions, 170 deletions
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 98ee1fbe8e6..16854cd63c6 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -1378,9 +1378,7 @@ std::string StateCheckersTest::testGarbageCollection( configure_stripe(cfg); // Insert after stripe configuration to avoid GC timestamp being implicitly reset BucketDatabase::Entry e(document::BucketId(17, 0)); - e.getBucketInfo().addNode(BucketCopy(prevTimestamp, 0, - api::BucketInfo(3,3,3)), - toVector((uint16_t)0)); + e.getBucketInfo().addNode(BucketCopy(prevTimestamp, 0, api::BucketInfo(3,3,3)), toVector((uint16_t)0)); e.getBucketInfo().setLastGarbageCollectionTime(prevTimestamp); getBucketDatabase().update(e); @@ -1389,8 +1387,7 @@ std::string StateCheckersTest::testGarbageCollection( getDistributorBucketSpace(), statsTracker, makeDocumentBucket(e.getBucketId())); getClock().setAbsoluteTimeInSeconds(nowTimestamp); - return testStateChecker(checker, c, false, PendingMessage(), - includePriority, includeSchedulingPri); + return testStateChecker(checker, c, false, PendingMessage(), includePriority, includeSchedulingPri); } TEST_F(StateCheckersTest, garbage_collection) { @@ -1749,7 +1746,7 @@ TEST_F(StateCheckersTest, stats_updates_for_maximum_time_since_gc_run) { .last_gc_at_time({17, 0}, 100) .runFor({17, 0}); - EXPECT_EQ(runner.stats().max_observed_time_since_last_gc().count(), 1900); + EXPECT_EQ(runner.stats().max_observed_time_since_last_gc(), 1900s); } } diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index 2ab37a21ec4..dad6f477d83 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -6,6 +6,8 @@ #include <vespa/storageapi/message/visitor.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> +#include <vespa/storageframework/generic/thread/runnable.h> +#include <vespa/storageframework/generic/thread/thread.h> #include <tests/distributor/top_level_distributor_test_util.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/test/make_document_bucket.h> diff --git a/storage/src/tests/storageframework/thread/tickingthreadtest.cpp b/storage/src/tests/storageframework/thread/tickingthreadtest.cpp index 577e9d128b9..9023f1ad199 100644 --- a/storage/src/tests/storageframework/thread/tickingthreadtest.cpp +++ b/storage/src/tests/storageframework/thread/tickingthreadtest.cpp @@ -32,7 +32,7 @@ struct MyApp : public TickingThread { std::vector<Context> _context; TickingThreadPool::UP _threadPool; - MyApp(int threadCount, bool doCritOverlapTest = false); + explicit MyApp(int threadCount, bool doCritOverlapTest = false); ~MyApp() override; void start(ThreadPool& p) { _threadPool->start(p); } @@ -56,37 +56,30 @@ struct MyApp : public TickingThread { return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; } uint64_t getMinCritTick() { - uint64_t min = std::numeric_limits<uint64_t>().max(); - for (uint32_t i=0; i<_context.size(); ++i) { - min = std::min(min, load_relaxed(_context[i]._critTickCount)); + uint64_t min = std::numeric_limits<uint64_t>::max(); + for (auto & c : _context) { + min = std::min(min, load_relaxed(c._critTickCount)); } return min; } - uint64_t getMinNonCritTick() { - uint64_t min = std::numeric_limits<uint64_t>().max(); - for (uint32_t i=0; i<_context.size(); ++i) { - min = std::min(min, load_relaxed(_context[i]._critTickCount)); - } - return min; - } - uint64_t getTotalCritTicks() const noexcept { + [[nodiscard]] uint64_t getTotalCritTicks() const noexcept { uint64_t total = 0; - for (uint32_t i=0; i<_context.size(); ++i) { - total += load_relaxed(_context[i]._critTickCount); + for (const auto & i : _context) { + total += load_relaxed(i._critTickCount); } return total; } - uint64_t getTotalNonCritTicks() const noexcept { + [[nodiscard]] uint64_t getTotalNonCritTicks() const noexcept { uint64_t total = 0; - for (uint32_t i=0; i<_context.size(); ++i) { - total += load_relaxed(_context[i]._nonCritTickCount); + for (const auto & c : _context) { + total += load_relaxed(c._nonCritTickCount); } return total; } - uint64_t getTotalTicks() const noexcept { + [[nodiscard]] uint64_t getTotalTicks() const noexcept { return getTotalCritTicks() + getTotalNonCritTicks(); } - bool hasCritOverlap() const noexcept { return load_relaxed(_critOverlap); } + [[nodiscard]] bool hasCritOverlap() const noexcept { return load_relaxed(_critOverlap); } }; MyApp::MyApp(int threadCount, bool doCritOverlapTest) @@ -215,7 +208,7 @@ RealClock clock; void printTaskInfo(const std::string& task, const char* action) { vespalib::string msg = vespalib::make_string( "%" PRIu64 ": %s %s\n", - clock.getTimeInMicros().getTime(), + vespalib::count_us(clock.getSystemTime().time_since_epoch()), task.c_str(), action); // std::cerr << msg; @@ -229,15 +222,15 @@ struct BroadcastApp : public TickingThread { // Set a huge wait time by default to ensure we have to notify BroadcastApp(); - ~BroadcastApp(); + ~BroadcastApp() override; void start(ThreadPool& p) { _threadPool->start(p); } ThreadWaitInfo doCriticalTick(ThreadIndex) override { if (!_queue.empty()) { - for (uint32_t i=0; i<_queue.size(); ++i) { - printTaskInfo(_queue[i], "activating"); - _active.push_back(_queue[i]); + for (const auto & task : _queue) { + printTaskInfo(task, "activating"); + _active.push_back(task); } _queue.clear(); return ThreadWaitInfo::MORE_WORK_ENQUEUED; @@ -246,9 +239,9 @@ struct BroadcastApp : public TickingThread { } ThreadWaitInfo doNonCriticalTick(ThreadIndex) override { if (!_active.empty()) { - for (uint32_t i=0; i<_active.size(); ++i) { - printTaskInfo(_active[i], "processing"); - _processed.push_back(_active[i]); + for (const auto & task : _active) { + printTaskInfo(task, "processing"); + _processed.push_back(task); } _active.clear(); } diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 8b8801f9cf5..9a98a40e7eb 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -5,7 +5,6 @@ #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/fieldvalue/document.h> -#include <vespa/document/datatype/documenttype.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/target.h> diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index cf98585bc82..ecf4e81322f 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -11,6 +11,7 @@ #include <vespa/storage/storageutil/distributorstatecache.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> +#include <vespa/storageframework/generic/thread/thread.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> @@ -18,6 +19,7 @@ #include <vespa/storageapi/message/stat.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/util/stringfmt.h> +#include <ranges> #include <vespa/config/helper/configgetter.hpp> #include <chrono> #include <thread> @@ -30,8 +32,7 @@ using namespace std::chrono_literals; namespace storage { -BucketManager::BucketManager(const config::ConfigUri & configUri, - ServiceLayerComponentRegister& compReg) +BucketManager::BucketManager(const config::ConfigUri & configUri, ServiceLayerComponentRegister& compReg) : StorageLink("Bucket manager"), framework::StatusReporter("bucketdb", "Bucket database"), _configUri(configUri), @@ -58,8 +59,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, ns.setMinUsedBits(58); _component.getStateUpdater().setReportedNodeState(ns); - auto server_config = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig( - configUri.getConfigId(), configUri.getContext()); + auto server_config = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(configUri.getConfigId(), configUri.getContext()); _simulated_processing_delay = std::chrono::milliseconds(std::max(0, server_config->simulatedBucketRequestLatencyMsec)); } @@ -83,10 +83,8 @@ void BucketManager::onClose() } void -BucketManager::print(std::ostream& out, bool verbose, - const std::string& indent) const +BucketManager::print(std::ostream& out, bool ,const std::string& ) const { - (void) verbose; (void) indent; out << "BucketManager()"; } @@ -115,19 +113,15 @@ public: { } - StorBucketDatabase::Decision operator()(uint64_t bucketId, - const StorBucketDatabase::Entry& data) + StorBucketDatabase::Decision operator()(uint64_t bucketId,const StorBucketDatabase::Entry& data) { document::BucketId b(document::BucketId::keyToBucketId(bucketId)); try{ uint16_t i = _state.getOwner(b); auto it = _result.find(i); if constexpr (log) { - LOG(spam, "Bucket %s (reverse %" PRIu64 "), should be handled" - " by distributor %u which we are %sgenerating " - "state for.", - b.toString().c_str(), bucketId, i, - it == _result.end() ? "not " : ""); + LOG(spam, "Bucket %s (reverse %" PRIu64 "), should be handled by distributor %u which we are %sgenerating state for.", + b.toString().c_str(), bucketId, i, it == _result.end() ? "not " : ""); } if (it != _result.end()) { api::RequestBucketInfoReply::Entry entry; @@ -136,17 +130,11 @@ public: it->second.push_back(entry); } } catch (lib::TooFewBucketBitsInUseException& e) { - LOGBP(warning, "Cannot assign bucket %s to a distributor " - " as bucket only specifies %u bits.", - b.toString().c_str(), - b.getUsedBits()); + LOGBP(warning, "Cannot assign bucket %s to a distributor as bucket only specifies %u bits.", + b.toString().c_str(), b.getUsedBits()); } catch (lib::NoDistributorsAvailableException& e) { - LOGBP(warning, "No distributors available while processing " - "request bucket info. Distribution hash: %s, " - "cluster state: %s", - _state.getDistribution().getNodeGraph() - .getDistributionConfigHash().c_str(), - _state.getClusterState().toString().c_str()); + LOGBP(warning, "No distributors available while processing request bucket info. Distribution hash: %s, cluster state: %s", + _state.getDistribution().getNodeGraph().getDistributionConfigHash().c_str(), _state.getClusterState().toString().c_str()); } return StorBucketDatabase::Decision::CONTINUE; } @@ -173,8 +161,7 @@ struct MetricsUpdater { void operator()(document::BucketId::Type bucketId, const StorBucketDatabase::Entry& data) noexcept { - document::BucketId bucket( - document::BucketId::keyToBucketId(bucketId)); + document::BucketId bucket(document::BucketId::keyToBucketId(bucketId)); if (data.valid()) { ++count.buckets; @@ -209,8 +196,7 @@ struct MetricsUpdater { StorBucketDatabase::Entry BucketManager::getBucketInfo(const document::Bucket &bucket) const { - StorBucketDatabase::WrappedEntry entry( - _component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::getBucketInfo")); + StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::getBucketInfo")); return *entry; } @@ -254,8 +240,7 @@ BucketManager::updateMetrics(bool updateDocCount) void BucketManager::update_bucket_db_memory_usage_metrics() { for (auto& space : _component.getBucketSpaceRepo()) { auto bm = _metrics->bucket_spaces.find(space.first); - bm->second->bucket_db_metrics.memory_usage.update( - space.second->bucketDatabase().detailed_memory_usage()); + bm->second->bucket_db_metrics.memory_usage.update(space.second->bucketDatabase().detailed_memory_usage()); } } @@ -266,10 +251,8 @@ void BucketManager::updateMinUsedBits() // When going through to get sizes, we also record min bits MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker()); if (bitTracker.getMinUsedBits() != m.lowestUsedBit) { - NodeStateUpdater::Lock::SP lock( - _component.getStateUpdater().grabStateChangeLock()); - lib::NodeState ns( - *_component.getStateUpdater().getReportedNodeState()); + NodeStateUpdater::Lock::SP lock(_component.getStateUpdater().grabStateChangeLock()); + lib::NodeState ns(*_component.getStateUpdater().getReportedNodeState()); bitTracker.setMinUsedBits(m.lowestUsedBit); ns.setMinUsedBits(m.lowestUsedBit); _component.getStateUpdater().setReportedNodeState(ns); @@ -406,7 +389,7 @@ bool BucketManager::onRequestBucketInfo( const std::shared_ptr<api::RequestBucketInfoCommand>& cmd) { LOG(debug, "Got request bucket info command %s", cmd->toString().c_str()); - if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { + if (cmd->getBuckets().empty() && cmd->hasSystemState()) { std::lock_guard guard(_workerLock); _bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd); @@ -419,23 +402,17 @@ bool BucketManager::onRequestBucketInfo( BucketSpace bucketSpace(cmd->getBucketSpace()); api::RequestBucketInfoReply::EntryVector info; - if (cmd->getBuckets().size()) { + if (!cmd->getBuckets().empty()) { typedef std::map<document::BucketId, StorBucketDatabase::WrappedEntry> BucketMap; - for (uint32_t i = 0; i < cmd->getBuckets().size(); i++) { - BucketMap entries(_component.getBucketDatabase(bucketSpace).getAll( - cmd->getBuckets()[i], - "BucketManager::onRequestBucketInfo")); - for (BucketMap::iterator it = entries.begin(); - it != entries.end(); ++it) - { - info.push_back(api::RequestBucketInfoReply::Entry( - it->first, it->second->getBucketInfo())); + for (auto i : cmd->getBuckets()) { + BucketMap entries(_component.getBucketDatabase(bucketSpace).getAll(i, "BucketManager::onRequestBucketInfo")); + for (auto & entrie : entries) { + info.emplace_back(entrie.first, entrie.second->getBucketInfo()); } } } else { - LOG(error, "We don't support fetching bucket info without bucket " - "list or system state"); + LOG(error, "We don't support fetching bucket info without bucket list or system state"); assert(false); } _metrics->simpleBucketInfoRequestSize.addValue(info.size()); @@ -445,17 +422,14 @@ bool BucketManager::onRequestBucketInfo( LOG(spam, "Returning list of checksums:"); for (const auto & entry : reply->getBucketInfo()) { - LOG(spam, "%s: %s", - entry._bucketId.toString().c_str(), - entry._info.toString().c_str()); + LOG(spam, "%s: %s", entry._bucketId.toString().c_str(), entry._info.toString().c_str()); } sendUp(reply); // Remaining replies dispatched by queueGuard upon function exit. return true; } -BucketManager::ScopedQueueDispatchGuard::ScopedQueueDispatchGuard( - BucketManager& mgr) +BucketManager::ScopedQueueDispatchGuard::ScopedQueueDispatchGuard(BucketManager& mgr) : _mgr(mgr) { _mgr.enterQueueProtectedSection(); @@ -520,16 +494,13 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac LOG(debug, "Processing %zu queued request bucket info commands for bucket space %s. " "Using cluster state '%s' and distribution hash '%s'", - reqs.size(), - bucketSpace.toString().c_str(), - clusterState->toString().c_str(), - our_hash.c_str()); + reqs.size(), bucketSpace.toString().c_str(), clusterState->toString().c_str(), our_hash.c_str()); std::lock_guard clusterStateGuard(_clusterStateLock); - for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) { + for (auto & req : std::ranges::reverse_view(reqs)) { // Currently small requests should not be forwarded to worker thread - assert((*it)->hasSystemState()); - const auto their_hash = (*it)->getDistributionHash(); + assert(req->hasSystemState()); + const auto their_hash = req->getDistributionHash(); std::ostringstream error; if ((clusterState->getVersion() != _last_cluster_state_version_initiated) || @@ -539,13 +510,13 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac // to another cluster state version does not happen atomically. Detect and // gracefully deal with the case where we're not internally in sync. error << "Inconsistent internal cluster state on node during transition; " - << "failing request from distributor " << (*it)->getDistributor() + << "failing request from distributor " << req->getDistributor() << " so it can be retried. Node version is " << clusterState->getVersion() << ", last version seen by the bucket manager is " << _last_cluster_state_version_initiated << ", last internally converged version is " << _last_cluster_state_version_completed; - } else if ((*it)->getSystemState().getVersion() != _last_cluster_state_version_initiated) { + } else if (req->getSystemState().getVersion() != _last_cluster_state_version_initiated) { error << "Ignoring bucket info request for cluster state version " - << (*it)->getSystemState().getVersion() << " as newest " + << req->getSystemState().getVersion() << " as newest " << "version we know of is " << _last_cluster_state_version_initiated; } else if (!their_hash.empty() && their_hash != our_hash) { // Mismatching config hash indicates nodes are out of sync with their config generations @@ -553,21 +524,21 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac << our_hash.c_str() << ", their hash: " << their_hash.c_str() << ")"; } if (error.str().empty()) { - auto result = seenDistributors.insert((*it)->getDistributor()); + auto result = seenDistributors.insert(req->getDistributor()); if (result.second) { - requests[(*it)->getDistributor()] = *it; + requests[req->getDistributor()] = req; continue; } else { error << "There is already a newer bucket info request for this" - << " node from distributor " << (*it)->getDistributor(); + << " node from distributor " << req->getDistributor(); } } // If we get here, message should be failed - auto reply = std::make_shared<api::RequestBucketInfoReply>(**it); + auto reply = std::make_shared<api::RequestBucketInfoReply>(*req); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, error.str())); LOG(debug, "Rejecting request from distributor %u: %s", - (*it)->getDistributor(), + req->getDistributor(), error.str().c_str()); sendUp(reply); } @@ -592,29 +563,24 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac } _metrics->fullBucketInfoRequestSize.addValue(requests.size()); - LOG(debug, "Processing %zu bucket info requests for " - "distributors %s, using system state %s", - requests.size(), distrList.str().c_str(), - clusterState->toString().c_str()); + LOG(debug, "Processing %zu bucket info requests for distributors %s, using system state %s", + requests.size(), distrList.str().c_str(), clusterState->toString().c_str()); framework::MilliSecTimer runStartTime(_component.getClock()); // Don't allow logging to lower performance of inner loop. // Call other type of instance if logging const document::BucketIdFactory& idFac(_component.getBucketIdFactory()); if (LOG_WOULD_LOG(spam)) { - DistributorInfoGatherer<true> builder( - *clusterState, result, idFac, distribution); + DistributorInfoGatherer<true> builder(*clusterState, result, idFac, distribution); _component.getBucketDatabase(bucketSpace).for_each_chunked(std::ref(builder), "BucketManager::processRequestBucketInfoCommands-1"); } else { - DistributorInfoGatherer<false> builder( - *clusterState, result, idFac, distribution); + DistributorInfoGatherer<false> builder(*clusterState, result, idFac, distribution); _component.getBucketDatabase(bucketSpace).for_each_chunked(std::ref(builder), "BucketManager::processRequestBucketInfoCommands-2"); } _metrics->fullBucketInfoLatency.addValue(runStartTime.getElapsedTimeAsDouble()); for (auto& nodeAndCmd : requests) { - auto reply(std::make_shared<api::RequestBucketInfoReply>( - *nodeAndCmd.second)); + auto reply(std::make_shared<api::RequestBucketInfoReply>(*nodeAndCmd.second)); reply->getBucketInfo().swap(result[nodeAndCmd.first]); sendUp(reply); } @@ -653,8 +619,7 @@ BucketManager::verifyAndUpdateLastModified(api::StorageCommand& cmd, uint64_t prevLastModified = 0; { - StorBucketDatabase::WrappedEntry entry( - _component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::verify")); + StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "BucketManager::verify")); if (entry.exist()) { prevLastModified = entry->info.getLastModified(); @@ -670,16 +635,11 @@ BucketManager::verifyAndUpdateLastModified(api::StorageCommand& cmd, } api::StorageReply::UP reply = cmd.makeReply(); - reply->setResult(api::ReturnCode( - api::ReturnCode::STALE_TIMESTAMP, + reply->setResult(api::ReturnCode(api::ReturnCode::STALE_TIMESTAMP, vespalib::make_string( - "Received command %s with a lower/equal timestamp " - " (%" PRIu64 ") than the last operation received for " - "bucket %s, with timestamp %" PRIu64, - cmd.toString().c_str(), - lastModified, - bucket.toString().c_str(), - prevLastModified))); + "Received command %s with a lower/equal timestamp (%" PRIu64 ") than the last " + "operation received for bucket %s, with timestamp %" PRIu64, + cmd.toString().c_str(), lastModified, bucket.toString().c_str(), prevLastModified))); sendUp(api::StorageMessage::SP(reply.release())); @@ -724,10 +684,8 @@ BucketManager::onCreateBucket(const api::CreateBucketCommand::SP& cmd) { MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker()); if (bitTracker.update(cmd->getBucketId())) { - NodeStateUpdater::Lock::SP lock( - _component.getStateUpdater().grabStateChangeLock()); - lib::NodeState ns( - *_component.getStateUpdater().getReportedNodeState()); + NodeStateUpdater::Lock::SP lock(_component.getStateUpdater().grabStateChangeLock()); + lib::NodeState ns(*_component.getStateUpdater().getReportedNodeState()); ns.setMinUsedBits(bitTracker.getMinUsedBits()); _component.getStateUpdater().setReportedNodeState(ns); } @@ -740,10 +698,8 @@ BucketManager::onMergeBucket(const api::MergeBucketCommand::SP& cmd) { MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker()); if (bitTracker.update(cmd->getBucketId())) { - NodeStateUpdater::Lock::SP lock( - _component.getStateUpdater().grabStateChangeLock()); - lib::NodeState ns( - *_component.getStateUpdater().getReportedNodeState()); + NodeStateUpdater::Lock::SP lock(_component.getStateUpdater().grabStateChangeLock()); + lib::NodeState ns(*_component.getStateUpdater().getReportedNodeState()); ns.setMinUsedBits(bitTracker.getMinUsedBits()); _component.getStateUpdater().setReportedNodeState(ns); } @@ -753,13 +709,7 @@ BucketManager::onMergeBucket(const api::MergeBucketCommand::SP& cmd) bool BucketManager::onRemove(const api::RemoveCommand::SP& cmd) { - if (!verifyAndUpdateLastModified(*cmd, - cmd->getBucket(), - cmd->getTimestamp())) { - return true; - } - - return false; + return !verifyAndUpdateLastModified(*cmd, cmd->getBucket(), cmd->getTimestamp()); } bool @@ -771,13 +721,7 @@ BucketManager::onRemoveReply(const api::RemoveReply::SP& reply) bool BucketManager::onPut(const api::PutCommand::SP& cmd) { - if (!verifyAndUpdateLastModified(*cmd, - cmd->getBucket(), - cmd->getTimestamp())) { - return true; - } - - return false; + return !verifyAndUpdateLastModified(*cmd, cmd->getBucket(), cmd->getTimestamp()); } bool @@ -789,13 +733,7 @@ BucketManager::onPutReply(const api::PutReply::SP& reply) bool BucketManager::onUpdate(const api::UpdateCommand::SP& cmd) { - if (!verifyAndUpdateLastModified(*cmd, - cmd->getBucket(), - cmd->getTimestamp())) { - return true; - } - - return false; + return !verifyAndUpdateLastModified(*cmd, cmd->getBucket(), cmd->getTimestamp()); } bool @@ -805,10 +743,8 @@ BucketManager::onUpdateReply(const api::UpdateReply::SP& reply) } bool -BucketManager::onNotifyBucketChangeReply( - const api::NotifyBucketChangeReply::SP& reply) +BucketManager::onNotifyBucketChangeReply(const api::NotifyBucketChangeReply::SP&) { - (void) reply; // Handling bucket change replies is a no-op. return true; } @@ -857,8 +793,7 @@ BucketManager::enqueueAsConflictIfProcessingRequest( { std::lock_guard guard(_queueProcessingLock); if (_requestsCurrentlyProcessing != 0) { - LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo", - reply->toString().c_str()); + LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo", reply->toString().c_str()); _queuedReplies.push_back(reply); _conflictingBuckets.insert(reply->getBucketId()); return true; diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index fda13e09c45..ce60ec88bff 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -67,7 +67,7 @@ private: size_t _requestsCurrentlyProcessing; ServiceLayerComponent _component; std::shared_ptr<BucketManagerMetrics> _metrics; - framework::Thread::UP _thread; + std::unique_ptr<framework::Thread> _thread; std::chrono::milliseconds _simulated_processing_delay; class ScopedQueueDispatchGuard { diff --git a/storage/src/vespa/storage/common/statusmetricconsumer.cpp b/storage/src/vespa/storage/common/statusmetricconsumer.cpp index e9360c35f3c..8eb3e9f3ab6 100644 --- a/storage/src/vespa/storage/common/statusmetricconsumer.cpp +++ b/storage/src/vespa/storage/common/statusmetricconsumer.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "statusmetricconsumer.h" +#include <vespa/storageframework/generic/clock/clock.h> #include <boost/lexical_cast.hpp> #include <vespa/metrics/jsonwriter.h> #include <vespa/metrics/textwriter.h> diff --git a/storage/src/vespa/storage/distributor/nodeinfo.cpp b/storage/src/vespa/storage/distributor/nodeinfo.cpp index 44a4670bd54..d02a1e1906e 100644 --- a/storage/src/vespa/storage/distributor/nodeinfo.cpp +++ b/storage/src/vespa/storage/distributor/nodeinfo.cpp @@ -16,7 +16,7 @@ bool NodeInfo::isBusy(uint16_t idx) const { const SingleNodeInfo& info = getNode(idx); if (info._busyUntilTime.time_since_epoch().count() != 0) { if (_clock.getMonotonicTime() > info._busyUntilTime) { - info._busyUntilTime = framework::MonotonicTimePoint{}; + info._busyUntilTime = vespalib::steady_time(); } else { return true; } diff --git a/storage/src/vespa/storage/distributor/nodeinfo.h b/storage/src/vespa/storage/distributor/nodeinfo.h index 0e1b04561fb..7f0716d7804 100644 --- a/storage/src/vespa/storage/distributor/nodeinfo.h +++ b/storage/src/vespa/storage/distributor/nodeinfo.h @@ -8,8 +8,11 @@ #pragma once #include <vector> -#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/vespalib/util/time.h> +namespace storage::framework{ + struct Clock; +} namespace storage::distributor { class NodeInfo { @@ -20,7 +23,7 @@ public: bool isBusy(uint16_t idx) const; - void setBusy(uint16_t idx, framework::MonotonicDuration for_duration); + void setBusy(uint16_t idx, vespalib::duration for_duration); void incPending(uint16_t idx); @@ -33,7 +36,7 @@ private: SingleNodeInfo() : _pending(0), _busyUntilTime() {} uint32_t _pending; - mutable framework::MonotonicTimePoint _busyUntilTime; + mutable vespalib::steady_time _busyUntilTime; }; mutable std::vector<SingleNodeInfo> _nodes; diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h index 57eed20f665..801c75322b3 100644 --- a/storage/src/vespa/storageapi/message/bucket.h +++ b/storage/src/vespa/storageapi/message/bucket.h @@ -14,7 +14,7 @@ #include <vespa/storageapi/messageapi/maintenancecommand.h> #include <vespa/document/base/globalid.h> #include <vespa/document/util/printable.h> -#include <vespa/vespalib/util/array.h> +#include <vespa/vespalib/stllike/allocator.h> #include <vespa/storageapi/defs.h> namespace document { class DocumentTypeRepo; } @@ -237,7 +237,7 @@ private: public: explicit GetBucketDiffReply(const GetBucketDiffCommand& cmd); - ~GetBucketDiffReply(); + ~GetBucketDiffReply() override; const std::vector<Node>& getNodes() const { return _nodes; } Timestamp getMaxTimestamp() const { return _maxTimestamp; } @@ -268,14 +268,14 @@ public: const document::DocumentTypeRepo *_repo; Entry(); - Entry(const GetBucketDiffCommand::Entry&); + explicit Entry(const GetBucketDiffCommand::Entry&); Entry(const Entry &); Entry & operator = (const Entry &); Entry(Entry &&) = default; Entry & operator = (Entry &&) = default; - ~Entry(); + ~Entry() override; - bool filled() const; + [[nodiscard]] bool filled() const; void print(std::ostream& out, bool verbose, const std::string& indent) const override; bool operator==(const Entry&) const; }; @@ -358,7 +358,7 @@ public: const std::vector<document::BucketId>& getBuckets() const { return _buckets; } - bool hasSystemState() const { return (_state.get() != 0); } + bool hasSystemState() const { return bool(_state); } uint16_t getDistributor() const { return _distributor; } const lib::ClusterState& getSystemState() const { return *_state; } @@ -387,8 +387,8 @@ public: bool operator==(const Entry& e) const { return (_bucketId == e._bucketId && _info == e._info); } bool operator!=(const Entry& e) const { return !(*this == e); } - Entry() : _bucketId(), _info() {} - Entry(const document::BucketId& id, const BucketInfo& info) + Entry() noexcept : _bucketId(), _info() {} + Entry(const document::BucketId& id, const BucketInfo& info) noexcept : _bucketId(id), _info(info) {} friend std::ostream& operator<<(std::ostream& os, const Entry&); }; @@ -397,7 +397,7 @@ public: bool two_phase_remove_location = false; bool no_implicit_indexing_of_active_buckets = false; }; - using EntryVector = vespalib::Array<Entry>; + using EntryVector = std::vector<Entry, vespalib::allocator_large<Entry>>; private: EntryVector _buckets; bool _full_bucket_fetch; |