diff options
author | Geir Storli <geirst@oath.com> | 2017-11-13 15:25:43 +0000 |
---|---|---|
committer | Geir Storli <geirst@oath.com> | 2017-11-13 15:25:43 +0000 |
commit | f74af1fbe7eb1be0a485942506f4458a3e183955 (patch) | |
tree | a43a700871a985349ca16dc79bd9451ab1461754 /storage | |
parent | 9bb55560ce93affa2685f7c787339b9994a1c5f8 (diff) |
Make StorageBucketDBInitializer bucket space aware.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp | 153 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h | 14 |
2 files changed, 113 insertions, 54 deletions
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp index 3a832f0fe3b..15338d733b3 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp @@ -5,6 +5,7 @@ #include "config-stor-bucket-init.h" #include "storbucketdb.h" #include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/common/content_bucket_space_repo.h> #include <vespa/storage/storageserver/storagemetricsset.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/io/fileutil.h> @@ -65,7 +66,7 @@ StorageBucketDBInitializer::System::System( : _doneInitializeHandler(doneInitializeHandler), _component(compReg, "storagebucketdbinitializer"), _partitions(partitions), - _bucketDatabase(_component.getBucketDatabase(BucketSpace::placeHolder())), + _bucketSpaceRepo(_component.getBucketSpaceRepo()), _nodeIndex(_component.getIndex()), _distribution(*_component.getDistribution()), _nodeState() @@ -82,6 +83,12 @@ StorageBucketDBInitializer::System::System( } } +StorBucketDatabase & +StorageBucketDBInitializer::System::getBucketDatabase(document::BucketSpace bucketSpace) const +{ + return _component.getBucketDatabase(bucketSpace); +} + StorageBucketDBInitializer::Metrics::Metrics(framework::Component& component) : metrics::MetricSet("dbinit", "", "Metrics for the storage bucket database initializer"), @@ -134,7 +141,10 @@ StorageBucketDBInitializer::StorageBucketDBInitializer( // Initialize read state for disks being available for (uint32_t i=0; i<_system._partitions.size(); ++i) { if (!_system._partitions[i].isUp()) continue; - _readState[i] = BucketReadState::UP(new BucketReadState); + _readState[i] = std::make_unique<BucketSpaceReadState>(); + for (const auto &elem : _system._bucketSpaceRepo) { + _readState[i]->insert(std::make_pair(elem.first, std::make_unique<BucketReadState>())); + } _state._dirsToList += 1; } _system._component.registerStatusPage(*this); @@ -155,9 +165,14 @@ StorageBucketDBInitializer::onOpen() // Trigger bucket database initialization for (uint32_t i=0; i<_system._partitions.size(); ++i) { if (!_system._partitions[i].isUp()) continue; - ReadBucketList::SP msg(new ReadBucketList(BucketSpace::placeHolder(), spi::PartitionId(i))); - _state._lists[msg->getMsgId()] = msg; - sendDown(msg); + assert(_readState[i].get() != nullptr); + const BucketSpaceReadState &spaceState = *_readState[i]; + for (const auto &stateElem : spaceState) { + document::BucketSpace bucketSpace = stateElem.first; + ReadBucketList::SP msg(new ReadBucketList(bucketSpace, spi::PartitionId(i))); + _state._lists[msg->getMsgId()] = msg; + sendDown(msg); + } } framework::MilliSecTime maxProcessingTime(10); framework::MilliSecTime sleepTime(1000); @@ -220,6 +235,26 @@ StorageBucketDBInitializer::print( out << "StorageBucketDBInitializer()"; } +namespace { + +size_t +notDoneCount(const StorageBucketDBInitializer::ReadState &readState) +{ + size_t result = 0; + for (const auto &elem : readState) { + if (elem.get() != nullptr) { + for (const auto &stateElem : *elem) { + if (!stateElem.second->_done) { + ++result; + } + } + } + } + return result; +} + +} + void StorageBucketDBInitializer::reportHtmlStatus( std::ostream& out, const framework::HttpUrlPath&) const @@ -261,10 +296,7 @@ StorageBucketDBInitializer::reportHtmlStatus( out << " " << _state._infoRequests.size() << " info requests pending.<br/>\n"; } - uint32_t incompleteScan = 0; - for (uint32_t i=0; i<_readState.size(); ++i) { - if (_readState[i].get() != 0 && !_readState[i]->_done) ++incompleteScan; - } + uint32_t incompleteScan = notDoneCount(_readState); if (incompleteScan == 0) { out << " Done iterating bucket database to generate info " << "requests.<br/>\n"; @@ -304,29 +336,31 @@ StorageBucketDBInitializer::reportHtmlStatus( out << " <h3>Disk " << i << " is down</h3>\n"; continue; } - BucketReadState& state(*_readState[i]); - out << " <h3>Disk " << i << "</h3>\n"; - out << " Pending info requests: " << pendingCounts[i] << " ("; - if (state._pending.empty()) { - out << "none"; - } else { - bool first = true; - for (BucketSet::const_iterator it = state._pending.begin(); - it != state._pending.end(); ++it) - { - if (!first) { - out << ", "; - } else { - first = false; + const BucketSpaceReadState& spaceState(*_readState[i]); + for (const auto &stateElem : spaceState) { + const BucketReadState &state = *stateElem.second; + out << " <h3>Disk " << i << ", bucket space " << stateElem.first.getId() << "</h3>\n"; + out << " Pending info requests: " << pendingCounts[i] << " ("; + if (state._pending.empty()) { + out << "none"; + } else { + bool first = true; + for (BucketSet::const_iterator it = state._pending.begin(); + it != state._pending.end(); ++it) { + if (!first) { + out << ", "; + } else { + first = false; + } + out << *it; } - out << *it; } + out << ")<br/>\n"; + out << " Bucked database iterator: " << state._databaseIterator + << "<br/>\n"; + out << " Done iterating bucket database. " + << (state._done ? "true" : "false") << "<br/>\n"; } - out << ")<br/>\n"; - out << " Bucked database iterator: " << state._databaseIterator - << "<br/>\n"; - out << " Done iterating bucket database. " - << (state._done ? "true" : "false") << "<br/>\n"; } for (std::map<Disk, uint32_t>::iterator it = pendingCounts.begin(); it != pendingCounts.end(); ++it) @@ -342,7 +376,7 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket, api::BucketInfo bucketInfo) { document::BucketId bucketId(bucket.getBucketId()); - StorBucketDatabase::WrappedEntry entry(_system._bucketDatabase.get( + StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(bucket.getBucketSpace()).get( bucketId, "StorageBucketDBInitializer::registerBucket", StorBucketDatabase::CREATE_IF_NONEXISTING)); if (bucketInfo.valid()) { @@ -459,9 +493,11 @@ namespace { // Always called from worker thread. It holds worker monitor. void -StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk) +StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk, document::BucketSpace bucketSpace) { - BucketReadState& state(*_readState[disk]); + auto itr = _readState[disk]->find(bucketSpace); + assert(itr != _readState[disk]->end()); + BucketReadState& state = *itr->second; if (state._done || state._pending.size() >= _config._maxPendingInfoReadsPerDisk) { @@ -473,7 +509,7 @@ StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk) NextBucketOnDiskFinder finder(disk, state._databaseIterator, count); LOG(spam, "Iterating bucket db further. Starting at iterator %s", state._databaseIterator.toString().c_str()); - _system._bucketDatabase.all(finder, + _system.getBucketDatabase(bucketSpace).all(finder, "StorageBucketDBInitializer::readBucketInfo", state._databaseIterator.stripUnused().toKey()); if (finder._alreadySet > 0) { @@ -481,7 +517,7 @@ StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk) _state._infoSetByLoad += finder._alreadySet; } for (uint32_t i=0; i<finder._next.size(); ++i) { - document::Bucket bucket(BucketSpace::placeHolder(), finder._next[i]); + document::Bucket bucket(bucketSpace, finder._next[i]); ReadBucketInfo::SP cmd(new ReadBucketInfo(bucket)); cmd->setPriority(_config._infoReadPriority); state._pending.insert(finder._next[i]); @@ -593,7 +629,7 @@ StorageBucketDBInitializer::handleReadBucketListReply( handleListingCompleted(); } checkIfDone(); - sendReadBucketInfo(reply.getPartition()); + sendReadBucketInfo(reply.getPartition(), reply.getBucketSpace()); } // Always called from worker thread. It holds worker monitor. @@ -601,12 +637,13 @@ void StorageBucketDBInitializer::handleReadBucketInfoReply( ReadBucketInfoReply& reply) { + document::BucketSpace bucketSpace = reply.getBucket().getBucketSpace(); if (reply.getResult().failed()) { LOGBP(warning, "Deleting %s from bucket database. Cannot use it as we " "failed to read bucket info for it: %s", reply.getBucketId().toString().c_str(), reply.getResult().toString().c_str()); - _system._bucketDatabase.erase(reply.getBucketId(), + _system.getBucketDatabase(bucketSpace).erase(reply.getBucketId(), "dbinit.failedreply"); } _metrics._infoReadCount.inc(); @@ -622,7 +659,9 @@ StorageBucketDBInitializer::handleReadBucketInfoReply( } else { uint32_t disk(it->second); _state._infoRequests.erase(it->first); - BucketReadState& state(*_readState[disk]); + auto itr = _readState[disk]->find(bucketSpace); + assert(itr != _readState[disk]->end()); + BucketReadState& state = *itr->second; BucketSet::iterator it2(state._pending.find(reply.getBucketId())); if (it2 == state._pending.end()) { LOGBP(warning, "Got bucket info reply for %s that was registered " @@ -632,12 +671,12 @@ StorageBucketDBInitializer::handleReadBucketInfoReply( state._pending.erase(reply.getBucketId()); LOG(spam, "Got info reply for %s: %s", reply.getBucketId().toString().c_str(), - _system._bucketDatabase.get( + _system.getBucketDatabase(reply.getBucket().getBucketSpace()).get( reply.getBucketId(), "dbinit.inforeply") ->getBucketInfo().toString().c_str()); } checkIfDone(); - sendReadBucketInfo(spi::PartitionId(disk)); + sendReadBucketInfo(spi::PartitionId(disk), bucketSpace); } } @@ -661,7 +700,7 @@ StorageBucketDBInitializer::handleInternalBucketJoinReply( LOG(debug, "Completed internal bucket join for %s. Got bucket info %s", reply.getBucketId().toString().c_str(), reply.getBucketInfo().toString().c_str()); - StorBucketDatabase::WrappedEntry entry(_system._bucketDatabase.get( + StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(reply.getBucket().getBucketSpace()).get( reply.getBucketId(), "StorageBucketDBInitializer::onInternalBucketJoinReply")); entry->setBucketInfo(reply.getBucketInfo()); @@ -674,6 +713,16 @@ StorageBucketDBInitializer::handleInternalBucketJoinReply( checkIfDone(); } +namespace { + +bool +isDone(const StorageBucketDBInitializer::ReadState &readState) +{ + return notDoneCount(readState) == 0; +} + +} + // Always called from worker thread. It holds worker monitor. void StorageBucketDBInitializer::checkIfDone() @@ -681,8 +730,8 @@ StorageBucketDBInitializer::checkIfDone() if (_state._dirsListed < _state._dirsToList) return; if (!_state._infoRequests.empty()) return; if (!_state._joins.empty()) return; - for (uint32_t i=0; i<_readState.size(); ++i) { - if (_readState[i].get() != 0 && !_readState[i]->_done) return; + if (!isDone(_readState)) { + return; } _state._doneInitializing = true; _system._doneInitializeHandler.notifyDoneInitializing(); @@ -698,17 +747,19 @@ StorageBucketDBInitializer::calculateMinProgressFromDiskIterators() const if (_readState[disk].get() == 0) { continue; } - const BucketReadState& state(*_readState[disk]); - document::BucketId bid(state._databaseIterator); + for (const auto &stateElem : *_readState[disk]) { + const BucketReadState &state = *stateElem.second; + document::BucketId bid(state._databaseIterator); - double progress; - if (!state._done) { - progress = BucketProgressCalculator::calculateProgress(bid); - } else { - progress = 1.0; - } + double progress; + if (!state._done) { + progress = BucketProgressCalculator::calculateProgress(bid); + } else { + progress = 1.0; + } - minProgress = std::min(minProgress, progress); + minProgress = std::min(minProgress, progress); + } } //std::cerr << "minProgress: " << minProgress << "\n"; return minProgress; diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h index 99f273a384a..15fbd5172cb 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h @@ -51,6 +51,7 @@ #include <vespa/vdslib/state/nodestate.h> #include <vespa/config/subscription/configuri.h> #include <list> +#include <unordered_map> namespace storage { @@ -77,7 +78,7 @@ class StorageBucketDBInitializer : public StorageLink, DoneInitializeHandler& _doneInitializeHandler; ServiceLayerComponent _component; const spi::PartitionStateList& _partitions; - StorBucketDatabase& _bucketDatabase; + const ContentBucketSpaceRepo& _bucketSpaceRepo; uint32_t _nodeIndex; lib::Distribution& _distribution; lib::NodeState _nodeState; // Disk info for ideal state calculations @@ -87,6 +88,8 @@ class StorageBucketDBInitializer : public StorageLink, DoneInitializeHandler& doneInitializeHandler, ServiceLayerComponentRegister&, const Config&); + + StorBucketDatabase &getBucketDatabase(document::BucketSpace bucketSpace) const; }; struct Metrics : public metrics::MetricSet { metrics::LongCountMetric _wrongDisk; @@ -126,12 +129,17 @@ class StorageBucketDBInitializer : public StorageLink, ~GlobalState(); }; +public: + using BucketSpaceReadState = std::unordered_map<document::BucketSpace, + std::unique_ptr<BucketReadState>, document::BucketSpace::hash>; + using ReadState = std::vector<std::unique_ptr<BucketSpaceReadState>>; +private: Config _config; System _system; Metrics _metrics; GlobalState _state; - std::vector<std::unique_ptr<BucketReadState>> _readState; + ReadState _readState; public: StorageBucketDBInitializer(const config::ConfigUri&, @@ -186,7 +194,7 @@ public: * Sends more read bucket info to a given disk. Lock must already be taken. * Will be released by function prior to sending messages down. */ - void sendReadBucketInfo(spi::PartitionId); + void sendReadBucketInfo(spi::PartitionId, document::BucketSpace bucketSpace); /** Check whether initialization is complete. Should hold lock to call it.*/ void checkIfDone(); |