diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2017-11-03 16:21:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-03 16:21:57 +0100 |
commit | dc7b61ade7a38dcd34f3ca1c6c3745cda5006d85 (patch) | |
tree | 3cb11220c184e608dbad3f8444a169cd100c048e /storage | |
parent | 5992380137bfda01dc0ca4677145682d7a650636 (diff) | |
parent | eb7ea0944deb34c32d08f56415ba04ffdbbf7a4f (diff) |
Merge pull request #4005 from vespa-engine/geirst/reduce-use-of-place-holder-bucket-space-in-content-layer-2
Geirst/reduce use of place holder bucket space in content layer 2
Diffstat (limited to 'storage')
4 files changed, 50 insertions, 35 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 65731ed64da..5e7cf4af046 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -227,7 +227,7 @@ BucketManager::updateMetrics(bool updateDocCount) uint32_t diskCount = _component.getDiskCount(); if (!updateDocCount || _doneInitialized) { MetricsUpdater m(diskCount); - _component.getBucketSpaceRepo().forEachBucket( + _component.getBucketSpaceRepo().forEachBucketChunked( m, "BucketManager::updateMetrics"); if (updateDocCount) { for (uint16_t i = 0; i< diskCount; i++) { @@ -244,7 +244,7 @@ BucketManager::updateMetrics(bool updateDocCount) void BucketManager::updateMinUsedBits() { MetricsUpdater m(_component.getDiskCount()); - _component.getBucketSpaceRepo().forEachBucket( + _component.getBucketSpaceRepo().forEachBucketChunked( m, "BucketManager::updateMetrics"); // When going through to get sizes, we also record min bits MinimumUsedBitsTracker& bitTracker(_component.getMinUsedBitsTracker()); @@ -266,20 +266,20 @@ void BucketManager::run(framework::ThreadHandle& thread) framework::MilliSecTime timeToCheckMinUsedBits(0); while (!thread.interrupted()) { bool didWork = false; - BIList infoReqs; + BucketInfoRequestMap infoReqs; { vespalib::MonitorGuard monitor(_workerMonitor); infoReqs.swap(_bucketInfoRequests); } - didWork |= processRequestBucketInfoCommands(infoReqs); + for (auto &req : infoReqs) { + didWork |= processRequestBucketInfoCommands(req.first, req.second); + } { vespalib::MonitorGuard monitor(_workerMonitor); - if (!infoReqs.empty()) { - infoReqs.insert(infoReqs.end(), - _bucketInfoRequests.begin(), _bucketInfoRequests.end()); - _bucketInfoRequests.swap(infoReqs); + for (const auto &req : infoReqs) { + assert(req.second.empty()); } if (!didWork) { monitor.wait(1000); @@ -343,7 +343,7 @@ BucketManager::reportStatus(std::ostream& out, framework::PartlyXmlStatusReporter xmlReporter(*this, out, path); xmlReporter << vespalib::xml::XmlTag("buckets"); BucketDBDumper dumper(xmlReporter.getStream()); - _component.getBucketSpaceRepo().forEachBucket( + _component.getBucketSpaceRepo().forEachBucketChunked( dumper, "BucketManager::reportStatus"); xmlReporter << vespalib::xml::XmlEndTag(); } else { @@ -362,7 +362,7 @@ BucketManager::dump(std::ostream& out) const { vespalib::XmlOutputStream xos(out); BucketDBDumper dumper(xos); - _component.getBucketSpaceRepo().forEachBucket(dumper, "BucketManager::dump"); + _component.getBucketSpaceRepo().forEachBucketChunked(dumper, "BucketManager::dump"); } @@ -394,7 +394,7 @@ bool BucketManager::onRequestBucketInfo( if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { vespalib::MonitorGuard monitor(_workerMonitor); - _bucketInfoRequests.push_back(cmd); + _bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd); monitor.signal(); LOG(spam, "Scheduled request bucket info request for retrieval"); return true; @@ -498,7 +498,8 @@ BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) } bool -BucketManager::processRequestBucketInfoCommands(BIList& reqs) +BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpace, + BucketInfoRequestList &reqs) { if (reqs.empty()) return false; @@ -529,7 +530,7 @@ BucketManager::processRequestBucketInfoCommands(BIList& reqs) our_hash.c_str()); vespalib::LockGuard lock(_clusterStateLock); - for (BIList::reverse_iterator it = reqs.rbegin(); it != reqs.rend(); ++it) { + for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) { // Currently small requests should not be forwarded to worker thread assert((*it)->hasSystemState()); const auto their_hash = normalizer.normalize( @@ -602,12 +603,12 @@ BucketManager::processRequestBucketInfoCommands(BIList& reqs) if (LOG_WOULD_LOG(spam)) { DistributorInfoGatherer<true> builder( *clusterState, result, idFac, distribution); - _component.getBucketDatabase(BucketSpace::placeHolder()).chunkedAll(builder, + _component.getBucketDatabase(bucketSpace).chunkedAll(builder, "BucketManager::processRequestBucketInfoCommands-1"); } else { DistributorInfoGatherer<false> builder( *clusterState, result, idFac, distribution); - _component.getBucketDatabase(BucketSpace::placeHolder()).chunkedAll(builder, + _component.getBucketDatabase(bucketSpace).chunkedAll(builder, "BucketManager::processRequestBucketInfoCommands-2"); } _metrics->fullBucketInfoLatency.addValue( diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index c680ff7ed6c..3b71230a8ed 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -12,20 +12,21 @@ #pragma once -#include <vespa/storage/bucketdb/config-stor-bucketdb.h> -#include "storbucketdb.h" #include "bucketmanagermetrics.h" +#include "storbucketdb.h" +#include <vespa/config/subscription/configuri.h> +#include <vespa/storage/bucketdb/config-stor-bucketdb.h> #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/common/storagelinkqueued.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/storageframework/generic/memory/memorymanagerinterface.h> -#include <vespa/storageframework/generic/status/statusreporter.h> #include <vespa/storageframework/generic/metric/metricupdatehook.h> +#include <vespa/storageframework/generic/status/statusreporter.h> -#include <vespa/storageapi/message/bucket.h> -#include <vespa/config/subscription/configuri.h> -#include <unordered_set> #include <list> +#include <unordered_map> +#include <unordered_set> namespace storage { @@ -34,16 +35,19 @@ class BucketManager : public StorageLinkQueued, private framework::Runnable, private framework::MetricUpdateHook { +public: /** Type used for message queues */ - typedef std::list<std::shared_ptr<api::StorageCommand> > CommandList; - typedef std::list<std::shared_ptr<api::RequestBucketInfoCommand> > BIList; + using CommandList = std::list<std::shared_ptr<api::StorageCommand>>; + using BucketInfoRequestList = std::list<std::shared_ptr<api::RequestBucketInfoCommand>>; + using BucketInfoRequestMap = std::unordered_map<document::BucketSpace, BucketInfoRequestList, document::BucketSpace::hash>; +private: config::ConfigUri _configUri; uint32_t _chunkLevel; mutable vespalib::Lock _stateAccess; framework::MemoryToken::UP _bucketDBMemoryToken; - BIList _bucketInfoRequests; + BucketInfoRequestMap _bucketInfoRequests; /** * We have our own thread running, which we use to send messages down. @@ -128,7 +132,8 @@ private: void updateMinUsedBits(); bool onRequestBucketInfo(const std::shared_ptr<api::RequestBucketInfoCommand>&) override; - bool processRequestBucketInfoCommands(BIList&); + bool processRequestBucketInfoCommands(document::BucketSpace bucketSpace, + BucketInfoRequestList &reqs); /** * Enqueue reply and add its bucket to the set of conflicting buckets iff diff --git a/storage/src/vespa/storage/common/content_bucket_space_repo.h b/storage/src/vespa/storage/common/content_bucket_space_repo.h index f6afa6002eb..390cfc15f5d 100644 --- a/storage/src/vespa/storage/common/content_bucket_space_repo.h +++ b/storage/src/vespa/storage/common/content_bucket_space_repo.h @@ -29,6 +29,14 @@ public: void forEachBucket(Functor &functor, const char *clientId) const { for (const auto &elem : _map) { + elem.second->bucketDatabase().all(functor, clientId); + } + } + + template <typename Functor> + void forEachBucketChunked(Functor &functor, + const char *clientId) const { + for (const auto &elem : _map) { elem.second->bucketDatabase().chunkedAll(functor, clientId); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index f1daa42ca39..88a7343f8c8 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -2,20 +2,21 @@ #include "filestormanager.h" -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storageapi/message/multioperation.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/message/removelocation.h> -#include <vespa/storageapi/message/state.h> +#include <vespa/storage/bucketdb/lockablemap.hpp> #include <vespa/storage/common/bucketmessages.h> +#include <vespa/storage/common/bucketoperationlogger.h> +#include <vespa/storage/common/content_bucket_space_repo.h> +#include <vespa/storage/common/messagebucket.h> #include <vespa/storage/config/config-stor-server.h> +#include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/storageutil/log.h> -#include <vespa/storage/common/messagebucket.h> -#include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storageapi/message/batch.h> -#include <vespa/storage/common/bucketoperationlogger.h> -#include <vespa/storage/bucketdb/lockablemap.hpp> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/multioperation.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/state.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/stringfmt.h> @@ -974,7 +975,7 @@ FileStorManager::updateState() if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) { LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database"); Deactivator deactivator; - _component.getBucketDatabase(BucketSpace::placeHolder()).all(deactivator, "FileStorManager::updateState"); + _component.getBucketSpaceRepo().forEachBucket(deactivator, "FileStorManager::updateState"); } _provider->setClusterState(spiState); _nodeUpInLastNodeStateSeenByProvider = nodeUp; |