diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-11-17 16:23:53 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-11-20 12:41:41 +0000 |
commit | 62d5d7b45e3a1fecbe2e1180b84bcc927c49d37e (patch) | |
tree | 4ac866cb50e2e7bb733fe6f946a11332ad06e31b /storage | |
parent | 16ef3fdbc97db934bf0b8593ce79e7e94a17cf9d (diff) |
Iterate over all bucket spaces during bucket integrity checking
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp | 140 | ||||
-rw-r--r-- | storage/src/vespa/storage/storageserver/bucketintegritychecker.h | 18 |
2 files changed, 73 insertions, 85 deletions
diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp index 0122fc935e7..b4c7d1e3e80 100644 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp +++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp @@ -19,31 +19,6 @@ using document::BucketSpace; namespace storage { namespace { - /* - std::string printDate(time_t time) { - char date[26]; - struct tm datestruct; - struct tm* datestructptr = gmtime_r(&time, &datestruct); - assert(datestructptr); - char* result = asctime_r(&datestruct, date); - size_t size = strlen(result); - while (size > 0) { - bool stop = false; - switch (result[size - 1]) { - case '\n': - case '\r': - case '\f': - case '\t': - --size; - default: - stop = true; - break; - } - if (stop) break; - } - return std::string(result, size); - } - */ std::string printMinutesOfDay(uint32_t minutesOfDay) { std::ostringstream ost; @@ -131,7 +106,7 @@ struct NextEntryFinder { _first = false; return StorBucketDatabase::CONTINUE; } else { - _next.reset(new document::BucketId(bucket)); + _next = std::make_unique<document::BucketId>(bucket); return StorBucketDatabase::ABORT; } } @@ -145,48 +120,58 @@ std::unique_ptr<document::BucketId> getNextId(StorBucketDatabase& database, database.each(proc, "BucketIntegrityChecker::getNextId", last.toKey()); return std::move(proc._next); } + +bool allBucketSpacesExhausted(size_t index, const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces) noexcept { + return (index == bucketSpaces.size() - 1); +} + } // End of anonymous namespace -document::BucketId -BucketIntegrityChecker::DiskData::iterate(StorBucketDatabase& bucketDatabase) +document::Bucket +BucketIntegrityChecker::DiskData::iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces, + const ContentBucketSpaceRepo& bucketSpaceRepo) { static uint32_t i=0; // Resend failed buckets once in a while - if (failedRepairs.size() > 0 && ++i % 10 == 9) - { - document::BucketId bid(failedRepairs.front()); + if (!failedRepairs.empty() && ++i % 10 == 9) { + document::Bucket bucket(failedRepairs.front()); LOG(spam, "Scheduling next bucket %s from failed repairs list", - bid.toString().c_str()); + bucket.toString().c_str()); failedRepairs.pop_front(); ++retriedBuckets; - return bid; + return bucket; } if (state == NOT_STARTED) { // Guaranteed to be before all buckets. currentBucket = document::BucketId(0, 0); + currentBucketSpaceIndex = 0; } - if (state != DONE) { - std::unique_ptr<document::BucketId> bid( - getNextId(bucketDatabase, currentBucket, disk)); + while (state != DONE) { + const auto currentSpace = bucketSpaces[currentBucketSpaceIndex]; + const auto bid = getNextId(bucketSpaceRepo.get(currentSpace).bucketDatabase(), currentBucket, disk); if (bid) { state = IN_PROGRESS; currentBucket = *bid; - return currentBucket; - } else { + return document::Bucket(currentSpace, currentBucket); + } else if (allBucketSpacesExhausted(currentBucketSpaceIndex, bucketSpaces)) { state = DONE; + break; + } else { + ++currentBucketSpaceIndex; + currentBucket = document::BucketId(0, 0); } } // If we didn't schedule repaired, but we ended up not having any other, // take repaired once anyways if (!failedRepairs.empty()) { - document::BucketId bid(failedRepairs.front()); + document::Bucket bucket(failedRepairs.front()); LOG(spam, "Done iterating, scheduling next bucket %s from failed " - "repairs list", bid.toString().c_str()); + "repairs list", bucket.toString().c_str()); failedRepairs.pop_front(); ++retriedBuckets; - return bid; + return bucket; } - return document::BucketId(0, 0); + return document::Bucket(bucketSpaces[currentBucketSpaceIndex], document::BucketId(0, 0)); } BucketIntegrityChecker::BucketIntegrityChecker( @@ -196,7 +181,9 @@ BucketIntegrityChecker::BucketIntegrityChecker( Runnable(), framework::HtmlStatusReporter("bucketintegritychecker", "Bucket integrity checker"), + _component(compReg, "bucketintegritychecker"), _cycleCount(0), + _bucketSpaces(_component.getBucketSpaceRepo().getBucketSpaces()), _status(), _lastCycleStart(0), _cycleStartBucketCount(0), @@ -207,9 +194,9 @@ BucketIntegrityChecker::BucketIntegrityChecker( _scheduleOptions(), _wait(), _configFetcher(configUri.getContext()), - _maxThreadWaitTime(60 * 1000), - _component(compReg, "bucketintegritychecker") + _maxThreadWaitTime(60 * 1000) { + assert(!_bucketSpaces.empty()); LOG(debug, "Configuring bucket integrity checker to work with %u disks.", _component.getDiskCount()); _status.resize(_component.getDiskCount()); @@ -367,8 +354,7 @@ BucketIntegrityChecker::onInternalReply( const std::shared_ptr<api::InternalReply>& internalReply) { // We only care about repair bucket replies - shared_ptr<RepairBucketReply> reply( - std::dynamic_pointer_cast<RepairBucketReply>(internalReply)); + auto reply = std::dynamic_pointer_cast<RepairBucketReply>(internalReply); if (!reply) { return false; } @@ -376,10 +362,11 @@ BucketIntegrityChecker::onInternalReply( vespalib::MonitorGuard monitor(_wait); _lastResponseTime = _component.getClock().getTimeInSeconds(); uint8_t disk = reply->getDisk(); + assert(disk < _status.size()); --_status[disk].pendingCount; LOG(spam, "Got repair reply for bucket %s: %s. %u messages still pending " "for disk %u. Bucket altered ? %s", - reply->getBucketId().toString().c_str(), + reply->getBucket().toString().c_str(), reply->getResult().toString().c_str(), _status[disk].pendingCount, disk, (reply->bucketAltered() ? "true" : "false")); @@ -400,13 +387,13 @@ BucketIntegrityChecker::onInternalReply( ++_status[disk].checkedBuckets; LOGBP(debug, "Failed to repair bucket %s due to aborting request. " "Likely bucket split/join or storage shutting down: %s", - reply->getBucketId().toString().c_str(), + reply->getBucket().toString().c_str(), reply->getResult().toString().c_str()); } else { - _status[disk].failedRepairs.push_back(reply->getBucketId()); + _status[disk].failedRepairs.push_back(reply->getBucket()); LOGBP(warning, "Failed to perform maintenance on bucket %s, " "scheduled to be retried: %s", - reply->getBucketId().toString().c_str(), + reply->getBucket().toString().c_str(), reply->getResult().toString().c_str()); } if (_lastCycleCompleted) { @@ -477,7 +464,7 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) { while (!thread.interrupted()) { thread.registerTick(framework::PROCESS_CYCLE); - // Get the state based on the current time. + // Get the state based on the current time. framework::SecondTime currentTime( _component.getClock().getTimeInSeconds()); @@ -527,8 +514,7 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) (_scheduleOptions._requestDelay - (currentTime - _lastResponseTime)).getMillis()); if (delay > _maxThreadWaitTime) delay = _maxThreadWaitTime; - monitor.wait(std::min(_maxThreadWaitTime.getTime(), - delay.getTime())); + monitor.wait(std::min(_maxThreadWaitTime.getTime(), delay.getTime())); thread.registerTick(framework::WAIT_CYCLE); } else if (_lastCycleCompleted && getTotalPendingCount() > 0) { LOG(spam, "Completed last cycle. Waiting until we have 0 pending " @@ -542,31 +528,29 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) _scheduleOptions._maxPendingCount); // Else we send up to max pending and wait for responses. if (_lastCycleCompleted) { - for (uint32_t i=0; i<_status.size(); ++i) { - _status[i].state = DiskData::NOT_STARTED; - _status[i].failedRepairs.clear(); - _status[i].checkedBuckets = 0; - _status[i].retriedBuckets = 0; + for (auto& disk : _status) { + disk.state = DiskData::NOT_STARTED; + disk.failedRepairs.clear(); + disk.checkedBuckets = 0; + disk.retriedBuckets = 0; } LOG(info, "Starting new verification/repair cycle at time %s.", currentTime.toString().c_str()); _lastCycleStart = currentTime; - _cycleStartBucketCount = _component.getBucketDatabase(BucketSpace::placeHolder()).size(); + _cycleStartBucketCount = 0; + for (auto space : _bucketSpaces) { + _cycleStartBucketCount += _component.getBucketDatabase(space).size(); + } _lastCycleCompleted = false; - _currentRunWithFullVerification - = (state == SchedulingOptions::RUN_FULL); + _currentRunWithFullVerification = (state == SchedulingOptions::RUN_FULL); ++_cycleCount; } - for (uint32_t i=0; i<_status.size(); ++i) { - while (_status[i].pendingCount - < _scheduleOptions._maxPendingCount) - { - document::BucketId bid(_status[i].iterate( - _component.getBucketDatabase(BucketSpace::placeHolder()))); - if (bid == document::BucketId(0, 0)) { - LOG(debug, "Completed repair cycle for disk %u.", i); - // If there is no next bucket, we might have completed - // run + for (auto& disk : _status) { + while (disk.pendingCount < _scheduleOptions._maxPendingCount) { + auto bucket = disk.iterate(_bucketSpaces, _component.getBucketSpaceRepo()); + if (bucket.getBucketId() == document::BucketId(0, 0)) { + LOG(debug, "Completed repair cycle for disk %u.", disk.disk); + // If there is no next bucket, we might have completed run bool completed = true; for (uint32_t j=0; j<_status.size(); ++j) { if (!_status[j].done()) { @@ -580,17 +564,15 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread) } break; } - document::Bucket bucket(BucketSpace::placeHolder(), bid); - std::shared_ptr<RepairBucketCommand> cmd( - new RepairBucketCommand(bucket, _status[i].disk)); + auto cmd = std::make_shared<RepairBucketCommand>(bucket, disk.disk); cmd->verifyBody(_currentRunWithFullVerification); cmd->moveToIdealDisk(true); cmd->setPriority(230); LOG(spam, "Sending new repair command for bucket %s. " "After this, there will be %u pending on disk %u", - bid.toString().c_str(), - _status[i].pendingCount + 1, _status[i].disk); - ++_status[i].pendingCount; + bucket.toString().c_str(), + disk.pendingCount + 1, disk.disk); + ++disk.pendingCount; dispatchDown(cmd); } } @@ -604,8 +586,8 @@ uint32_t BucketIntegrityChecker::getTotalPendingCount() const { uint32_t total = 0; - for (uint32_t i=0; i<_status.size(); ++i) { - total += _status[i].pendingCount; + for (auto& disk : _status) { + total += disk.pendingCount; } return total; } diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h index 3134ad1d749..fb619a84c46 100644 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h +++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h @@ -10,6 +10,7 @@ #pragma once +#include <vespa/storage/common/content_bucket_space_repo.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/common/storagelinkqueued.h> #include <vespa/storage/config/config-stor-integritychecker.h> @@ -67,17 +68,20 @@ public: */ enum State { NOT_STARTED, IN_PROGRESS, DONE }; + size_t currentBucketSpaceIndex; document::BucketId currentBucket; uint32_t pendingCount; State state; uint8_t disk; - std::list<document::BucketId> failedRepairs; + std::list<document::Bucket> failedRepairs; uint32_t checkedBuckets; uint32_t retriedBuckets; - DiskData() : currentBucket(0), pendingCount(0), - state(NOT_STARTED), disk(255), - checkedBuckets(0), retriedBuckets(0) {} + DiskData() + : currentBucketSpaceIndex(0), currentBucket(0), + pendingCount(0), state(NOT_STARTED), disk(255), + checkedBuckets(0), retriedBuckets(0) + {} bool done() const; // Whether we're still working on this disk bool working() const; // Whether we've stated and not finished @@ -85,11 +89,14 @@ public: * Get the next bucket to repair. If no more to iterate, random bucket * is returned. Check if done() afterwards. */ - document::BucketId iterate(StorBucketDatabase&); + document::Bucket iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces, + const ContentBucketSpaceRepo& bucketSpaceRepo); }; private: + ServiceLayerComponent _component; uint32_t _cycleCount; + ContentBucketSpaceRepo::BucketSpaces _bucketSpaces; std::vector<DiskData> _status; framework::SecondTime _lastCycleStart; uint32_t _cycleStartBucketCount; @@ -101,7 +108,6 @@ private: vespalib::Monitor _wait; config::ConfigFetcher _configFetcher; framework::MilliSecTime _maxThreadWaitTime; - ServiceLayerComponent _component; framework::Thread::UP _thread; BucketIntegrityChecker(const BucketIntegrityChecker &); |