aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-11-17 16:23:53 +0000
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-11-20 12:41:41 +0000
commit62d5d7b45e3a1fecbe2e1180b84bcc927c49d37e (patch)
tree4ac866cb50e2e7bb733fe6f946a11332ad06e31b /storage
parent16ef3fdbc97db934bf0b8593ce79e7e94a17cf9d (diff)
Iterate over all bucket spaces during bucket integrity checking
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp140
-rw-r--r--storage/src/vespa/storage/storageserver/bucketintegritychecker.h18
2 files changed, 73 insertions, 85 deletions
diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp
index 0122fc935e7..b4c7d1e3e80 100644
--- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp
+++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp
@@ -19,31 +19,6 @@ using document::BucketSpace;
namespace storage {
namespace {
- /*
- std::string printDate(time_t time) {
- char date[26];
- struct tm datestruct;
- struct tm* datestructptr = gmtime_r(&time, &datestruct);
- assert(datestructptr);
- char* result = asctime_r(&datestruct, date);
- size_t size = strlen(result);
- while (size > 0) {
- bool stop = false;
- switch (result[size - 1]) {
- case '\n':
- case '\r':
- case '\f':
- case '\t':
- --size;
- default:
- stop = true;
- break;
- }
- if (stop) break;
- }
- return std::string(result, size);
- }
- */
std::string printMinutesOfDay(uint32_t minutesOfDay) {
std::ostringstream ost;
@@ -131,7 +106,7 @@ struct NextEntryFinder {
_first = false;
return StorBucketDatabase::CONTINUE;
} else {
- _next.reset(new document::BucketId(bucket));
+ _next = std::make_unique<document::BucketId>(bucket);
return StorBucketDatabase::ABORT;
}
}
@@ -145,48 +120,58 @@ std::unique_ptr<document::BucketId> getNextId(StorBucketDatabase& database,
database.each(proc, "BucketIntegrityChecker::getNextId", last.toKey());
return std::move(proc._next);
}
+
+bool allBucketSpacesExhausted(size_t index, const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces) noexcept {
+ return (index == bucketSpaces.size() - 1);
+}
+
} // End of anonymous namespace
-document::BucketId
-BucketIntegrityChecker::DiskData::iterate(StorBucketDatabase& bucketDatabase)
+document::Bucket
+BucketIntegrityChecker::DiskData::iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces,
+ const ContentBucketSpaceRepo& bucketSpaceRepo)
{
static uint32_t i=0;
// Resend failed buckets once in a while
- if (failedRepairs.size() > 0 && ++i % 10 == 9)
- {
- document::BucketId bid(failedRepairs.front());
+ if (!failedRepairs.empty() && ++i % 10 == 9) {
+ document::Bucket bucket(failedRepairs.front());
LOG(spam, "Scheduling next bucket %s from failed repairs list",
- bid.toString().c_str());
+ bucket.toString().c_str());
failedRepairs.pop_front();
++retriedBuckets;
- return bid;
+ return bucket;
}
if (state == NOT_STARTED) {
// Guaranteed to be before all buckets.
currentBucket = document::BucketId(0, 0);
+ currentBucketSpaceIndex = 0;
}
- if (state != DONE) {
- std::unique_ptr<document::BucketId> bid(
- getNextId(bucketDatabase, currentBucket, disk));
+ while (state != DONE) {
+ const auto currentSpace = bucketSpaces[currentBucketSpaceIndex];
+ const auto bid = getNextId(bucketSpaceRepo.get(currentSpace).bucketDatabase(), currentBucket, disk);
if (bid) {
state = IN_PROGRESS;
currentBucket = *bid;
- return currentBucket;
- } else {
+ return document::Bucket(currentSpace, currentBucket);
+ } else if (allBucketSpacesExhausted(currentBucketSpaceIndex, bucketSpaces)) {
state = DONE;
+ break;
+ } else {
+ ++currentBucketSpaceIndex;
+ currentBucket = document::BucketId(0, 0);
}
}
// If we didn't schedule repaired, but we ended up not having any other,
// take repaired once anyways
if (!failedRepairs.empty()) {
- document::BucketId bid(failedRepairs.front());
+ document::Bucket bucket(failedRepairs.front());
LOG(spam, "Done iterating, scheduling next bucket %s from failed "
- "repairs list", bid.toString().c_str());
+ "repairs list", bucket.toString().c_str());
failedRepairs.pop_front();
++retriedBuckets;
- return bid;
+ return bucket;
}
- return document::BucketId(0, 0);
+ return document::Bucket(bucketSpaces[currentBucketSpaceIndex], document::BucketId(0, 0));
}
BucketIntegrityChecker::BucketIntegrityChecker(
@@ -196,7 +181,9 @@ BucketIntegrityChecker::BucketIntegrityChecker(
Runnable(),
framework::HtmlStatusReporter("bucketintegritychecker",
"Bucket integrity checker"),
+ _component(compReg, "bucketintegritychecker"),
_cycleCount(0),
+ _bucketSpaces(_component.getBucketSpaceRepo().getBucketSpaces()),
_status(),
_lastCycleStart(0),
_cycleStartBucketCount(0),
@@ -207,9 +194,9 @@ BucketIntegrityChecker::BucketIntegrityChecker(
_scheduleOptions(),
_wait(),
_configFetcher(configUri.getContext()),
- _maxThreadWaitTime(60 * 1000),
- _component(compReg, "bucketintegritychecker")
+ _maxThreadWaitTime(60 * 1000)
{
+ assert(!_bucketSpaces.empty());
LOG(debug, "Configuring bucket integrity checker to work with %u disks.",
_component.getDiskCount());
_status.resize(_component.getDiskCount());
@@ -367,8 +354,7 @@ BucketIntegrityChecker::onInternalReply(
const std::shared_ptr<api::InternalReply>& internalReply)
{
// We only care about repair bucket replies
- shared_ptr<RepairBucketReply> reply(
- std::dynamic_pointer_cast<RepairBucketReply>(internalReply));
+ auto reply = std::dynamic_pointer_cast<RepairBucketReply>(internalReply);
if (!reply) {
return false;
}
@@ -376,10 +362,11 @@ BucketIntegrityChecker::onInternalReply(
vespalib::MonitorGuard monitor(_wait);
_lastResponseTime = _component.getClock().getTimeInSeconds();
uint8_t disk = reply->getDisk();
+ assert(disk < _status.size());
--_status[disk].pendingCount;
LOG(spam, "Got repair reply for bucket %s: %s. %u messages still pending "
"for disk %u. Bucket altered ? %s",
- reply->getBucketId().toString().c_str(),
+ reply->getBucket().toString().c_str(),
reply->getResult().toString().c_str(),
_status[disk].pendingCount, disk,
(reply->bucketAltered() ? "true" : "false"));
@@ -400,13 +387,13 @@ BucketIntegrityChecker::onInternalReply(
++_status[disk].checkedBuckets;
LOGBP(debug, "Failed to repair bucket %s due to aborting request. "
"Likely bucket split/join or storage shutting down: %s",
- reply->getBucketId().toString().c_str(),
+ reply->getBucket().toString().c_str(),
reply->getResult().toString().c_str());
} else {
- _status[disk].failedRepairs.push_back(reply->getBucketId());
+ _status[disk].failedRepairs.push_back(reply->getBucket());
LOGBP(warning, "Failed to perform maintenance on bucket %s, "
"scheduled to be retried: %s",
- reply->getBucketId().toString().c_str(),
+ reply->getBucket().toString().c_str(),
reply->getResult().toString().c_str());
}
if (_lastCycleCompleted) {
@@ -477,7 +464,7 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread)
{
while (!thread.interrupted()) {
thread.registerTick(framework::PROCESS_CYCLE);
- // Get the state based on the current time.
+ // Get the state based on the current time.
framework::SecondTime currentTime(
_component.getClock().getTimeInSeconds());
@@ -527,8 +514,7 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread)
(_scheduleOptions._requestDelay
- (currentTime - _lastResponseTime)).getMillis());
if (delay > _maxThreadWaitTime) delay = _maxThreadWaitTime;
- monitor.wait(std::min(_maxThreadWaitTime.getTime(),
- delay.getTime()));
+ monitor.wait(std::min(_maxThreadWaitTime.getTime(), delay.getTime()));
thread.registerTick(framework::WAIT_CYCLE);
} else if (_lastCycleCompleted && getTotalPendingCount() > 0) {
LOG(spam, "Completed last cycle. Waiting until we have 0 pending "
@@ -542,31 +528,29 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread)
_scheduleOptions._maxPendingCount);
// Else we send up to max pending and wait for responses.
if (_lastCycleCompleted) {
- for (uint32_t i=0; i<_status.size(); ++i) {
- _status[i].state = DiskData::NOT_STARTED;
- _status[i].failedRepairs.clear();
- _status[i].checkedBuckets = 0;
- _status[i].retriedBuckets = 0;
+ for (auto& disk : _status) {
+ disk.state = DiskData::NOT_STARTED;
+ disk.failedRepairs.clear();
+ disk.checkedBuckets = 0;
+ disk.retriedBuckets = 0;
}
LOG(info, "Starting new verification/repair cycle at time %s.",
currentTime.toString().c_str());
_lastCycleStart = currentTime;
- _cycleStartBucketCount = _component.getBucketDatabase(BucketSpace::placeHolder()).size();
+ _cycleStartBucketCount = 0;
+ for (auto space : _bucketSpaces) {
+ _cycleStartBucketCount += _component.getBucketDatabase(space).size();
+ }
_lastCycleCompleted = false;
- _currentRunWithFullVerification
- = (state == SchedulingOptions::RUN_FULL);
+ _currentRunWithFullVerification = (state == SchedulingOptions::RUN_FULL);
++_cycleCount;
}
- for (uint32_t i=0; i<_status.size(); ++i) {
- while (_status[i].pendingCount
- < _scheduleOptions._maxPendingCount)
- {
- document::BucketId bid(_status[i].iterate(
- _component.getBucketDatabase(BucketSpace::placeHolder())));
- if (bid == document::BucketId(0, 0)) {
- LOG(debug, "Completed repair cycle for disk %u.", i);
- // If there is no next bucket, we might have completed
- // run
+ for (auto& disk : _status) {
+ while (disk.pendingCount < _scheduleOptions._maxPendingCount) {
+ auto bucket = disk.iterate(_bucketSpaces, _component.getBucketSpaceRepo());
+ if (bucket.getBucketId() == document::BucketId(0, 0)) {
+ LOG(debug, "Completed repair cycle for disk %u.", disk.disk);
+ // If there is no next bucket, we might have completed run
bool completed = true;
for (uint32_t j=0; j<_status.size(); ++j) {
if (!_status[j].done()) {
@@ -580,17 +564,15 @@ BucketIntegrityChecker::run(framework::ThreadHandle& thread)
}
break;
}
- document::Bucket bucket(BucketSpace::placeHolder(), bid);
- std::shared_ptr<RepairBucketCommand> cmd(
- new RepairBucketCommand(bucket, _status[i].disk));
+ auto cmd = std::make_shared<RepairBucketCommand>(bucket, disk.disk);
cmd->verifyBody(_currentRunWithFullVerification);
cmd->moveToIdealDisk(true);
cmd->setPriority(230);
LOG(spam, "Sending new repair command for bucket %s. "
"After this, there will be %u pending on disk %u",
- bid.toString().c_str(),
- _status[i].pendingCount + 1, _status[i].disk);
- ++_status[i].pendingCount;
+ bucket.toString().c_str(),
+ disk.pendingCount + 1, disk.disk);
+ ++disk.pendingCount;
dispatchDown(cmd);
}
}
@@ -604,8 +586,8 @@ uint32_t
BucketIntegrityChecker::getTotalPendingCount() const
{
uint32_t total = 0;
- for (uint32_t i=0; i<_status.size(); ++i) {
- total += _status[i].pendingCount;
+ for (auto& disk : _status) {
+ total += disk.pendingCount;
}
return total;
}
diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h
index 3134ad1d749..fb619a84c46 100644
--- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h
+++ b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h
@@ -10,6 +10,7 @@
#pragma once
+#include <vespa/storage/common/content_bucket_space_repo.h>
#include <vespa/storage/common/servicelayercomponent.h>
#include <vespa/storage/common/storagelinkqueued.h>
#include <vespa/storage/config/config-stor-integritychecker.h>
@@ -67,17 +68,20 @@ public:
*/
enum State { NOT_STARTED, IN_PROGRESS, DONE };
+ size_t currentBucketSpaceIndex;
document::BucketId currentBucket;
uint32_t pendingCount;
State state;
uint8_t disk;
- std::list<document::BucketId> failedRepairs;
+ std::list<document::Bucket> failedRepairs;
uint32_t checkedBuckets;
uint32_t retriedBuckets;
- DiskData() : currentBucket(0), pendingCount(0),
- state(NOT_STARTED), disk(255),
- checkedBuckets(0), retriedBuckets(0) {}
+ DiskData()
+ : currentBucketSpaceIndex(0), currentBucket(0),
+ pendingCount(0), state(NOT_STARTED), disk(255),
+ checkedBuckets(0), retriedBuckets(0)
+ {}
bool done() const; // Whether we're still working on this disk
bool working() const; // Whether we've stated and not finished
@@ -85,11 +89,14 @@ public:
* Get the next bucket to repair. If no more to iterate, random bucket
* is returned. Check if done() afterwards.
*/
- document::BucketId iterate(StorBucketDatabase&);
+ document::Bucket iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces,
+ const ContentBucketSpaceRepo& bucketSpaceRepo);
};
private:
+ ServiceLayerComponent _component;
uint32_t _cycleCount;
+ ContentBucketSpaceRepo::BucketSpaces _bucketSpaces;
std::vector<DiskData> _status;
framework::SecondTime _lastCycleStart;
uint32_t _cycleStartBucketCount;
@@ -101,7 +108,6 @@ private:
vespalib::Monitor _wait;
config::ConfigFetcher _configFetcher;
framework::MilliSecTime _maxThreadWaitTime;
- ServiceLayerComponent _component;
framework::Thread::UP _thread;
BucketIntegrityChecker(const BucketIntegrityChecker &);