diff options
Diffstat (limited to 'storage')
7 files changed, 123 insertions, 119 deletions
diff --git a/storage/src/vespa/storage/common/content_bucket_space.cpp b/storage/src/vespa/storage/common/content_bucket_space.cpp index b78be81c9de..0f84a2d0e38 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.cpp +++ b/storage/src/vespa/storage/common/content_bucket_space.cpp @@ -5,8 +5,24 @@ namespace storage { ContentBucketSpace::ContentBucketSpace() - : _bucketDatabase() + : _bucketDatabase(), + _lock(), + _distribution() { } +void +ContentBucketSpace::setDistribution(std::shared_ptr<const lib::Distribution> distribution) +{ + std::lock_guard<std::mutex> guard(_lock); + _distribution = std::move(distribution); +} + +std::shared_ptr<const lib::Distribution> +ContentBucketSpace::getDistribution() const +{ + std::lock_guard<std::mutex> guard(_lock); + return _distribution; +} + } diff --git a/storage/src/vespa/storage/common/content_bucket_space.h b/storage/src/vespa/storage/common/content_bucket_space.h index 2efb2eca06d..65524121ca6 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.h +++ b/storage/src/vespa/storage/common/content_bucket_space.h @@ -2,20 +2,27 @@ #pragma once #include <vespa/storage/bucketdb/storbucketdb.h> +#include <mutex> namespace storage { +namespace lib { class Distribution; } + /** * Class representing a bucket space (with associated bucket database) on a content node. */ class ContentBucketSpace { private: StorBucketDatabase _bucketDatabase; + mutable std::mutex _lock; + std::shared_ptr<const lib::Distribution> _distribution; public: using UP = std::unique_ptr<ContentBucketSpace>; ContentBucketSpace(); StorBucketDatabase &bucketDatabase() { return _bucketDatabase; } + void setDistribution(std::shared_ptr<const lib::Distribution> distribution); + std::shared_ptr<const lib::Distribution> getDistribution() const; }; } diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp index da734c07c2d..6fd75e06d9d 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp +++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp @@ -36,4 +36,14 @@ ServiceLayerComponentRegisterImpl::setDiskCount(uint16_t count) } } +void +ServiceLayerComponentRegisterImpl::setDistribution(lib::Distribution::SP distribution) +{ + // For now, copy distribution to all content bucket spaces + for (const auto &elem : _bucketSpaceRepo) { + elem.second->setDistribution(distribution); + } + StorageComponentRegisterImpl::setDistribution(distribution); +} + } // storage diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h index 5b3e54e3831..df4047c92c3 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h +++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h @@ -37,6 +37,7 @@ public: void registerServiceLayerComponent(ServiceLayerManagedComponent&) override; void setDiskCount(uint16_t count); + virtual void setDistribution(lib::Distribution::SP distribution) override; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index dfd04b271b2..cf41a297541 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -978,7 +978,7 @@ FileStorManager::updateState() } for (const auto &elem : _component.getBucketSpaceRepo()) { BucketSpace bucketSpace(elem.first); - spi::ClusterState spiState(*state, _component.getIndex(), *_component.getDistribution()); + spi::ClusterState spiState(*state, _component.getIndex(), *elem.second->getDistribution()); _provider->setClusterState(bucketSpace, spiState); } _nodeUpInLastNodeStateSeenByProvider = nodeUp; diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp index 5a3c7e5c35c..b4c7d1e3e80 100644 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp +++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp @@ -19,31 +19,6 @@ using document::BucketSpace; namespace storage { namespace { - /* - std::string printDate(time_t time) { - char date[26]; - struct tm datestruct; - struct tm* datestructptr = gmtime_r(&time, &datestruct); - assert(datestructptr); - char* result = asctime_r(&datestruct, date); - size_t size = strlen(result); - while (size > 0) { - bool stop = false; - switch (result[size - 1]) { - case '\n': - case '\r': - case '\f': - case '\t': - --size; - default: - stop = true; - break; - } - if (stop) break; - } - return std::string(result, size); - } - */ std::string printMinutesOfDay(uint32_t minutesOfDay) { std::ostringstream ost; @@ -131,7 +106,7 @@ struct NextEntryFinder { _first = false; return StorBucketDatabase::CONTINUE; } else { - _next.reset(new document::BucketId(bucket)); + _next = std::make_unique<document::BucketId>(bucket); return StorBucketDatabase::ABORT; } } @@ -145,48 +120,58 @@ std::unique_ptr<document::BucketId> getNextId(StorBucketDatabase& database, database.each(proc, "BucketIntegrityChecker::getNextId", last.toKey()); return std::move(proc._next); } + +bool allBucketSpacesExhausted(size_t index, const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces) noexcept { + return (index == bucketSpaces.size() - 1); +} + } // End of anonymous namespace -document::BucketId -BucketIntegrityChecker::DiskData::iterate(StorBucketDatabase& bucketDatabase) +document::Bucket +BucketIntegrityChecker::DiskData::iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces, + const ContentBucketSpaceRepo& bucketSpaceRepo) { static uint32_t i=0; - // Resend failed buckets once in a while - if (failedRepairs.size() > 0 && ++i % 10 == 9) - { - document::BucketId bid(failedRepairs.front()); + // Resend failed buckets once in a while + if (!failedRepairs.empty() && ++i % 10 == 9) { + document::Bucket bucket(failedRepairs.front()); LOG(spam, "Scheduling next bucket %s from failed repairs list", - bid.toString().c_str()); + bucket.toString().c_str()); failedRepairs.pop_front(); ++retriedBuckets; - return bid; + return bucket; } if (state == NOT_STARTED) { - // Guarantueed to be before all buckets. + // Guaranteed to be before all buckets. currentBucket = document::BucketId(0, 0); + currentBucketSpaceIndex = 0; } - if (state != DONE) { - std::unique_ptr<document::BucketId> bid( - getNextId(bucketDatabase, currentBucket, disk)); - if (bid.get()) { + while (state != DONE) { + const auto currentSpace = bucketSpaces[currentBucketSpaceIndex]; + const auto bid = getNextId(bucketSpaceRepo.get(currentSpace).bucketDatabase(), currentBucket, disk); + if (bid) { state = IN_PROGRESS; currentBucket = *bid; - return currentBucket; - } else { + return document::Bucket(currentSpace, currentBucket); + } else if (allBucketSpacesExhausted(currentBucketSpaceIndex, bucketSpaces)) { state = DONE; + break; + } else { + ++currentBucketSpaceIndex; + currentBucket = document::BucketId(0, 0); } } - // If we didn't schedule repaired, but we ended up not having any other, - // take repaired once anyways - if (failedRepairs.size() > 0) { - document::BucketId bid(failedRepairs.front()); + // If we didn't schedule repaired, but we ended up not having any other, + // take repaired once anyways + if (!failedRepairs.empty()) { + document::Bucket bucket(failedRepairs.front()); LOG(spam, "Done iterating, scheduling next bucket %s from failed " - "repairs list", bid.toString().c_str()); + "repairs list", bucket.toString().c_str()); failedRepairs.pop_front(); ++retriedBuckets; - return bid; + return bucket; } - return document::BucketId(0, 0); + return document::Bucket(bucketSpaces[currentBucketSpaceIndex], document::BucketId(0, 0)); } BucketIntegrityChecker::BucketIntegrityChecker( @@ -196,7 +181,9 @@ BucketIntegrityChecker::BucketIntegrityChecker( Runnable(), framework::HtmlStatusReporter("bucketintegritychecker", "Bucket integrity checker"), + _component(compReg, "bucketintegritychecker"), _cycleCount(0), + _bucketSpaces(_component.getBucketSpaceRepo().getBucketSpaces()), _status(), _lastCycleStart(0), _cycleStartBucketCount(0), @@ -205,19 +192,18 @@ BucketIntegrityChecker::BucketIntegrityChecker( _currentRunWithFullVerification(false), _verifyAllRepairs(false), _scheduleOptions(), - _systemState(), _wait(), _configFetcher(configUri.getContext()), - _maxThreadWaitTime(60 * 1000), - _component(compReg, "bucketintegritychecker") + _maxThreadWaitTime(60 * 1000) { + assert(!_bucketSpaces.empty()); LOG(debug, "Configuring bucket integrity checker to work with %u disks.", _component.getDiskCount()); _status.resize(_component.getDiskCount()); for (uint16_t i=0; i<_component.getDiskCount(); ++i) { _status[i].disk = i; } - if (_status.size() == 0) { + if (_status.empty()) { throw vespalib::IllegalStateException( "Cannot have storage with no disks.", VESPA_STRLOC); } @@ -254,10 +240,10 @@ BucketIntegrityChecker::~BucketIntegrityChecker() void BucketIntegrityChecker::onClose() { - // Avoid getting config during shutdown + // Avoid getting config during shutdown _configFetcher.close(); - // Close thread to ensure we don't send anything more down after - if (_thread.get() != 0) { + // Close thread to ensure we don't send anything more down after + if (_thread) { LOG(debug, "Waiting for bucket integrity worker thread to close."); _thread->interruptAndJoin(&_wait); LOG(debug, "Bucket integrity worker thread closed."); @@ -367,18 +353,20 @@ bool BucketIntegrityChecker::onInternalReply( const std::shared_ptr<api::InternalReply>& internalReply) { - // We only care about repair bucket replies - shared_ptr<RepairBucketReply> reply( - std::dynamic_pointer_cast<RepairBucketReply>(internalReply)); - if (!reply.get()) return false; + // We only care about repair bucket replies + auto reply = std::dynamic_pointer_cast<RepairBucketReply>(internalReply); + if (!reply) { + return false; + } vespalib::MonitorGuard monitor(_wait); _lastResponseTime = _component.getClock().getTimeInSeconds(); uint8_t disk = reply->getDisk(); + assert(disk < _status.size()); --_status[disk].pendingCount; LOG(spam, "Got repair reply for bucket %s: %s. %u messages still pending " "for disk %u. Bucket altered ? %s", - reply->getBucketId().toString().c_str(), + reply->getBucket().toString().c_str(), reply->getResult().toString().c_str(), _status[disk].pendingCount, disk, (reply->bucketAltered() ? "true" : "false")); @@ -399,13 +387,13 @@ BucketIntegrityChecker::onInternalReply( ++_status[disk].checkedBuckets; LOGBP(debug, "Failed to repair bucket %s due to aborting request. " "Likely bucket split/join or storage shutting down: %s", - reply->getBucketId().toString().c_str(), + reply->getBucket().toString().c_str(), reply->getResult().toString().c_str()); } else { - _status[disk].failedRepairs.push_back(reply->getBucketId()); + _status[disk].failedRepairs.push_back(reply->getBucket()); LOGBP(warning, "Failed to perform maintenance on bucket %s, " "scheduled to be retried: %s", - reply->getBucketId().toString().c_str(), + reply->getBucket().toString().c_str(), reply->getResult().toString().c_str()); } if (_lastCycleCompleted) { @@ -415,16 +403,6 @@ BucketIntegrityChecker::onInternalReply( return true; } -bool -BucketIntegrityChecker::onSetSystemState( - const std::shared_ptr<api::SetSystemStateCommand>& cmd) -{ - vespalib::MonitorGuard monitor(_wait); - _systemState = cmd->getSystemState(); - return false; -} - - SchedulingOptions::RunState BucketIntegrityChecker::getCurrentRunState( framework::SecondTime currentTime) const @@ -449,9 +427,7 @@ BucketIntegrityChecker::getCurrentRunState( ) ) { // If we're within region in day that we can run. -//std::cerr << "We're inside time boundary. Current time: " << minutesOfDay << " (" << printMinutesOfDay(minutesOfDay) << "). Running between " << _scheduleOptions._dailyCycleStart << " (" << printMinutesOfDay(_scheduleOptions._dailyCycleStart) << ") - " << _scheduleOptions._dailyCycleStop << " (" << printMinutesOfDay(_scheduleOptions._dailyCycleStop) << ")\n"; if (state == SchedulingOptions::CONTINUE) { -//std::cerr << "We're in continue state.\n"; // If we're in a continue state, set runstate if there's a current // run active that isn't completed yet, don't run otherwise. state = (_lastCycleCompleted @@ -471,18 +447,13 @@ BucketIntegrityChecker::getCurrentRunState( if (_currentRunWithFullVerification || state == SchedulingOptions::RUN_CHEAP) { -//std::cerr << "Tagging dont run since too little time passed since last run\n" << "current time: " << currentTime << ", last start " << _lastCycleStart << ", min cycle time " << _scheduleOptions._minCycleTime << "\n"; state = SchedulingOptions::DONT_RUN; } else { -//std::cerr << "We can start new run. Last cycle started at " << _lastCycleStart.toString() << " current time is " << currentTime.toString() << " and min cycle time is " << _scheduleOptions._minCycleTime << "\n"; } - } else { -//std::cerr << "Enough time passed? " << currentTime.toString() << " - " << _lastCycleStart.toString() << " >= " << _scheduleOptions._minCycleTime << "\n"; } } } else { // If we're outside of time of day boundaries, don't run -//std::cerr << "We're outside time boundary. Current time: " << minutesOfDay << " (" << printMinutesOfDay(minutesOfDay) << "). Only running between " << _scheduleOptions._dailyCycleStart << " (" << printMinutesOfDay(_scheduleOptions._dailyCycleStart) << ") - " << _scheduleOptions._dailyCycleStop << " (" << printMinutesOfDay(_scheduleOptions._dailyCycleStop) << ")\n"; state = SchedulingOptions::DONT_RUN; } return state; @@ -493,7 +464,7 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) { while (!thread.interrupted()) { thread.registerTick(framework::PROCESS_CYCLE); - // Get the state based on the current time. + // Get the state based on the current time. framework::SecondTime currentTime( _component.getClock().getTimeInSeconds()); @@ -543,8 +514,7 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) (_scheduleOptions._requestDelay - (currentTime - _lastResponseTime)).getMillis()); if (delay > _maxThreadWaitTime) delay = _maxThreadWaitTime; - monitor.wait(std::min(_maxThreadWaitTime.getTime(), - delay.getTime())); + monitor.wait(std::min(_maxThreadWaitTime.getTime(), delay.getTime())); thread.registerTick(framework::WAIT_CYCLE); } else if (_lastCycleCompleted && getTotalPendingCount() > 0) { LOG(spam, "Completed last cycle. Waiting until we have 0 pending " @@ -558,31 +528,29 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) _scheduleOptions._maxPendingCount); // Else we send up to max pending and wait for responses. if (_lastCycleCompleted) { - for (uint32_t i=0; i<_status.size(); ++i) { - _status[i].state = DiskData::NOT_STARTED; - _status[i].failedRepairs.clear(); - _status[i].checkedBuckets = 0; - _status[i].retriedBuckets = 0; + for (auto& disk : _status) { + disk.state = DiskData::NOT_STARTED; + disk.failedRepairs.clear(); + disk.checkedBuckets = 0; + disk.retriedBuckets = 0; } LOG(info, "Starting new verification/repair cycle at time %s.", currentTime.toString().c_str()); _lastCycleStart = currentTime; - _cycleStartBucketCount = _component.getBucketDatabase(BucketSpace::placeHolder()).size(); + _cycleStartBucketCount = 0; + for (auto space : _bucketSpaces) { + _cycleStartBucketCount += _component.getBucketDatabase(space).size(); + } _lastCycleCompleted = false; - _currentRunWithFullVerification - = (state == SchedulingOptions::RUN_FULL); + _currentRunWithFullVerification = (state == SchedulingOptions::RUN_FULL); ++_cycleCount; } - for (uint32_t i=0; i<_status.size(); ++i) { - while (_status[i].pendingCount - < _scheduleOptions._maxPendingCount) - { - document::BucketId bid(_status[i].iterate( - _component.getBucketDatabase(BucketSpace::placeHolder()))); - if (bid == document::BucketId(0, 0)) { - LOG(debug, "Completed repair cycle for disk %u.", i); - // If there is no next bucket, we might have completed - // run + for (auto& disk : _status) { + while (disk.pendingCount < _scheduleOptions._maxPendingCount) { + auto bucket = disk.iterate(_bucketSpaces, _component.getBucketSpaceRepo()); + if (bucket.getBucketId() == document::BucketId(0, 0)) { + LOG(debug, "Completed repair cycle for disk %u.", disk.disk); + // If there is no next bucket, we might have completed run bool completed = true; for (uint32_t j=0; j<_status.size(); ++j) { if (!_status[j].done()) { @@ -596,17 +564,15 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) } break; } - document::Bucket bucket(BucketSpace::placeHolder(), bid); - std::shared_ptr<RepairBucketCommand> cmd( - new RepairBucketCommand(bucket, _status[i].disk)); + auto cmd = std::make_shared<RepairBucketCommand>(bucket, disk.disk); cmd->verifyBody(_currentRunWithFullVerification); cmd->moveToIdealDisk(true); cmd->setPriority(230); LOG(spam, "Sending new repair command for bucket %s. " "After this, there will be %u pending on disk %u", - bid.toString().c_str(), - _status[i].pendingCount + 1, _status[i].disk); - ++_status[i].pendingCount; + bucket.toString().c_str(), + disk.pendingCount + 1, disk.disk); + ++disk.pendingCount; dispatchDown(cmd); } } @@ -620,8 +586,8 @@ uint32_t BucketIntegrityChecker::getTotalPendingCount() const { uint32_t total = 0; - for (uint32_t i=0; i<_status.size(); ++i) { - total += _status[i].pendingCount; + for (auto& disk : _status) { + total += disk.pendingCount; } return total; } diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h index b169090ab3c..fb619a84c46 100644 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h +++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h @@ -10,6 +10,7 @@ #pragma once +#include <vespa/storage/common/content_bucket_space_repo.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/common/storagelinkqueued.h> #include <vespa/storage/config/config-stor-integritychecker.h> @@ -67,17 +68,20 @@ public: */ enum State { NOT_STARTED, IN_PROGRESS, DONE }; + size_t currentBucketSpaceIndex; document::BucketId currentBucket; uint32_t pendingCount; State state; uint8_t disk; - std::list<document::BucketId> failedRepairs; + std::list<document::Bucket> failedRepairs; uint32_t checkedBuckets; uint32_t retriedBuckets; - DiskData() : currentBucket(0), pendingCount(0), - state(NOT_STARTED), disk(255), - checkedBuckets(0), retriedBuckets(0) {} + DiskData() + : currentBucketSpaceIndex(0), currentBucket(0), + pendingCount(0), state(NOT_STARTED), disk(255), + checkedBuckets(0), retriedBuckets(0) + {} bool done() const; // Whether we're still working on this disk bool working() const; // Whether we've stated and not finished @@ -85,11 +89,14 @@ public: * Get the next bucket to repair. If no more to iterate, random bucket * is returned. Check if done() afterwards. */ - document::BucketId iterate(StorBucketDatabase&); + document::Bucket iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces, + const ContentBucketSpaceRepo& bucketSpaceRepo); }; private: + ServiceLayerComponent _component; uint32_t _cycleCount; + ContentBucketSpaceRepo::BucketSpaces _bucketSpaces; std::vector<DiskData> _status; framework::SecondTime _lastCycleStart; uint32_t _cycleStartBucketCount; @@ -98,11 +105,9 @@ private: bool _currentRunWithFullVerification; bool _verifyAllRepairs; SchedulingOptions _scheduleOptions; - lib::ClusterState _systemState; vespalib::Monitor _wait; config::ConfigFetcher _configFetcher; framework::MilliSecTime _maxThreadWaitTime; - ServiceLayerComponent _component; framework::Thread::UP _thread; BucketIntegrityChecker(const BucketIntegrityChecker &); @@ -130,7 +135,6 @@ private: void configure(std::unique_ptr<vespa::config::content::core::StorIntegritycheckerConfig>) override; void onDoneInit() override; bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override; - bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override; bool onNotifyBucketChangeReply(const std::shared_ptr<api::NotifyBucketChangeReply>&) override { return true; } SchedulingOptions::RunState getCurrentRunState(framework::SecondTime time) const; void run(framework::ThreadHandle&) override; |