diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-30 19:11:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-30 19:11:41 +0200 |
commit | 4a6858ec6f1cfd6517670b0d0132be51c625edac (patch) | |
tree | 157e2b3042684c89244f842507c98a3c00b640fe | |
parent | f3c67d39861da9138f2948998435d502008442c8 (diff) | |
parent | 21bdff69e115dedc6dcfd7478af51cc6b2568dcf (diff) |
Merge pull request #13126 from vespa-engine/vekterli/remove-deprecated-bucket-integrity-checker
Remove deprecated bucket integrity checker
25 files changed, 1 insertions, 1358 deletions
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp index 298e29cbecb..b45a4131e7c 100644 --- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp +++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp @@ -2080,23 +2080,6 @@ TEST_F(ConformanceTest, testJoinSameSourceBucketsTargetExists) } } -TEST_F(ConformanceTest, testMaintain) -{ - document::TestDocMan testDocMan; - _factory->clear(); - PersistenceProvider::UP spi(getSpi(*_factory, testDocMan)); - Context context(defaultLoadType, Priority(0), Trace::TraceLevel(0)); - Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x01, 1); - - Bucket bucket(makeSpiBucket(BucketId(8, 0x01))); - spi->createBucket(bucket, context); - - spi->put(bucket, Timestamp(3), doc1, context); - - EXPECT_EQ(Result::ErrorType::NONE, - spi->maintain(bucket, LOW).getErrorCode()); -} - TEST_F(ConformanceTest, testGetModifiedBuckets) { document::TestDocMan testDocMan; diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 3f05ed36802..984bbcd845f 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -303,8 +303,7 @@ DummyPersistence::DummyPersistence( _nextIterator(1), _iterators(), _monitor(), - _clusterState(), - _simulateMaintainFailure(false) + _clusterState() {} DummyPersistence::~DummyPersistence() {} @@ -465,27 +464,6 @@ DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&) return Result(); } -Result -DummyPersistence::maintain(const Bucket& b, MaintenanceLevel) -{ - assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - if (_simulateMaintainFailure) { - BucketContentGuard::UP bc(acquireBucketWithLock(b)); - if (!bc.get()) { - return BucketInfoResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found"); - } - - if (!(*bc)->_entries.empty()) { - // Simulate a corruption in a document, remove it. - (*bc)->_entries.pop_back(); - } - (*bc)->setOutdatedInfo(true); - _simulateMaintainFailure = false; - } - - return Result(); -} - RemoveResult DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 88e17a90a98..27ed95bd6ee 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -174,7 +174,6 @@ public: Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; Result revert(const Bucket&, Timestamp, Context&); - Result maintain(const Bucket& bucket, MaintenanceLevel level) override; /** * The following methods are used only for unit testing. @@ -195,10 +194,6 @@ public: return *_clusterState; } - void simulateMaintenanceFailure() { - _simulateMaintainFailure = true; - } - private: friend class BucketContentGuard; // Const since funcs only alter mutable field in BucketContent @@ -218,8 +213,6 @@ private: std::unique_ptr<ClusterState> _clusterState; - bool _simulateMaintainFailure; - std::unique_ptr<document::select::Node> parseDocumentSelection( const string& documentSelection, bool allowLeaf); diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index cbd76951004..e346fdaa3cb 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -33,11 +33,6 @@ public: /** * Default impl is empty. */ - Result maintain(const Bucket&, MaintenanceLevel) override { return Result(); } - - /** - * Default impl is empty. - */ Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } /** diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 70645b31902..338fa6e03b0 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -357,15 +357,6 @@ struct PersistenceProvider virtual BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const = 0; /** - * Allows the provider to do periodic maintenance and verification. - * - * @param level The level of maintenance to do. LOW maintenance is - * scheduled more often than HIGH maintenance, allowing costly operations - * to be run less. - */ - virtual Result maintain(const Bucket&, MaintenanceLevel level) = 0; - - /** * Splits the source bucket into the two target buckets. * After the split, all documents belonging to target1 should be * in that bucket, and all documents belonging to target2 should be diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp index 511d0e3fee8..8d8665b645e 100644 --- a/searchcore/src/apps/proton/downpersistence.cpp +++ b/searchcore/src/apps/proton/downpersistence.cpp @@ -134,12 +134,6 @@ DownPersistence::getModifiedBuckets(BucketSpace) const Result -DownPersistence::maintain(const Bucket&, MaintenanceLevel) -{ - return errorResult; -} - -Result DownPersistence::split(const Bucket&, const Bucket&, const Bucket&, Context&) { return errorResult; diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h index 2ae0605fdb6..2716b0ee3b4 100644 --- a/searchcore/src/apps/proton/downpersistence.h +++ b/searchcore/src/apps/proton/downpersistence.h @@ -47,7 +47,6 @@ public: Result createBucket(const Bucket&, Context&) override; Result deleteBucket(const Bucket&, Context&) override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; - Result maintain(const Bucket&, MaintenanceLevel level) override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; Result move(const Bucket&, PartitionId target, Context&) override; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 48074d491c8..8344996e298 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -626,12 +626,6 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck } -Result -PersistenceEngine::maintain(const Bucket& , MaintenanceLevel) -{ - return Result(); -} - void PersistenceEngine::destroyIterators() { diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index d28a00b909e..43239e95a96 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -117,8 +117,6 @@ public: Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; - Result maintain(const Bucket&, MaintenanceLevel) override; - void destroyIterators(); void propagateSavedClusterState(BucketSpace bucketSpace, IPersistenceHandler &handler); void grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenceHandler &handler); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 2cb390fca86..6497f8bb698 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -425,47 +425,6 @@ TEST_F(FileStorManagerTest, state_change) { EXPECT_FALSE(getDummyPersistence().getClusterState().nodeUp()); } -TEST_F(FileStorManagerTest, repair_notifies_distributor_on_change) { - // Setting up manager - DummyStorageLink top; - FileStorManager *manager; - top.push_back(unique_ptr<StorageLink>(manager = - new FileStorManager(config->getConfigId(), _node->getPartitions(), _node->getPersistenceProvider(), _node->getComponentRegister()))); - setClusterState("storage:1 distributor:1"); - top.open(); - - createBucket(document::BucketId(16, 1), 0); - - api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); - - // Creating a document to test with - - for (uint32_t i = 0; i < 3; ++i) { - document::DocumentId docId(vespalib::make_string("id:ns:testdoctype1:n=1:%d", i)); - auto doc = std::make_shared<Document>(*_testdoctype1, docId); - auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, 1)), doc, i + 1); - cmd->setAddress(address); - top.sendDown(cmd); - } - - top.waitForMessages(3, _waitTime); - top.reset(); - - getDummyPersistence().simulateMaintenanceFailure(); - - auto cmd = std::make_shared<RepairBucketCommand>(makeDocumentBucket(document::BucketId(16, 1)), 0); - top.sendDown(cmd); - - top.waitForMessages(2, _waitTime); - - EXPECT_EQ( - std::string("NotifyBucketChangeCommand(BucketId(0x4000000000000001), " - "BucketInfo(crc 0xa14e7e3f, docCount 2, totDocSize 174, " - "ready true, active false))"), top.getReply(0)->toString()); - - top.close(); -} - TEST_F(FileStorManagerTest, flush) { // Setting up manager DummyStorageLink top; diff --git a/storage/src/tests/storageserver/CMakeLists.txt b/storage/src/tests/storageserver/CMakeLists.txt index 9b63f5054c0..1d759a534f6 100644 --- a/storage/src/tests/storageserver/CMakeLists.txt +++ b/storage/src/tests/storageserver/CMakeLists.txt @@ -10,7 +10,6 @@ vespa_add_library(storage_teststorageserver TEST vespa_add_executable(storage_storageserver_gtest_runner_app TEST SOURCES bouncertest.cpp - bucketintegritycheckertest.cpp changedbucketownershiphandlertest.cpp communicationmanagertest.cpp configurable_bucket_resolver_test.cpp diff --git a/storage/src/tests/storageserver/bucketintegritycheckertest.cpp b/storage/src/tests/storageserver/bucketintegritycheckertest.cpp deleted file mode 100644 index 8a68adf226c..00000000000 --- a/storage/src/tests/storageserver/bucketintegritycheckertest.cpp +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/storage/storageserver/bucketintegritychecker.h> -#include <vespa/storageapi/message/persistence.h> -#include <tests/common/testhelper.h> -#include <tests/common/dummystoragelink.h> -#include <vespa/vespalib/io/fileutil.h> -#include <tests/common/teststorageapp.h> -#include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/util/time.h> -#include <thread> - -using namespace ::testing; - -namespace storage { - -struct BucketIntegrityCheckerTest : public Test { - std::unique_ptr<vdstestlib::DirConfig> _config; - std::unique_ptr<TestServiceLayerApp> _node; - int _timeout; // Timeout in seconds before aborting - - void SetUp() override { - _timeout = 60*2; - _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true)); - _node = std::make_unique<TestServiceLayerApp>( - DiskCount(256), NodeIndex(0), _config->getConfigId()); - } -}; - -TEST_F(BucketIntegrityCheckerTest, config) { - // Verify that config is read correctly. Given config should not use - // any default values. - vdstestlib::DirConfig::Config& config( - _config->getConfig("stor-integritychecker")); - config.set("dailycyclestart", "60"); - config.set("dailycyclestop", "360"); - config.set("weeklycycle", "crRc-rc"); - config.set("maxpending", "2"); - config.set("mincycletime", "120"); - config.set("requestdelay", "5"); - - BucketIntegrityChecker checker(_config->getConfigId(), - _node->getComponentRegister()); - checker.setMaxThreadWaitTime(framework::MilliSecTime(10)); - SchedulingOptions& opt(checker.getSchedulingOptions()); - EXPECT_EQ(60u, opt._dailyCycleStart); - EXPECT_EQ(360u, opt._dailyCycleStop); - EXPECT_EQ(SchedulingOptions::CONTINUE, opt._dailyStates[0]); - EXPECT_EQ(SchedulingOptions::RUN_CHEAP, opt._dailyStates[1]); - EXPECT_EQ(SchedulingOptions::RUN_FULL, opt._dailyStates[2]); - EXPECT_EQ(SchedulingOptions::CONTINUE, opt._dailyStates[3]); - EXPECT_EQ(SchedulingOptions::DONT_RUN, opt._dailyStates[4]); - EXPECT_EQ(SchedulingOptions::RUN_CHEAP, opt._dailyStates[5]); - EXPECT_EQ(SchedulingOptions::CONTINUE, opt._dailyStates[6]); - EXPECT_EQ(2u, opt._maxPendingCount); - EXPECT_EQ(framework::SecondTime(7200), opt._minCycleTime); - EXPECT_EQ(framework::SecondTime(5), opt._requestDelay); -} - -namespace { - /** - * Calculate a date based on the following format: - * week<#> <day> <hh>:<mm>:<ss> - * Examples: "week3 mon 00:30:00" - * "week3 tue 04:20:00" - * "week9 thi 14:00:24" - */ - time_t getDate(const std::string& datestring) { - vespalib::string rest(datestring); - int spacePos = rest.find(' '); - uint32_t week = strtoul(rest.substr(4, spacePos-4).c_str(), NULL, 0); - rest = rest.substr(spacePos+1); - vespalib::string wday(rest.substr(0,3)); - rest = rest.substr(4); - uint32_t hours = strtoul(rest.substr(0, 2).c_str(), NULL, 0); - uint32_t minutes = strtoul(rest.substr(3, 2).c_str(), NULL, 0); - uint32_t seconds = strtoul(rest.substr(6, 2).c_str(), NULL, 0); - uint32_t day(0); - if (wday == "mon") { day = 1; } - else if (wday == "tue") { day = 2; } - else if (wday == "wed") { day = 3; } - else if (wday == "thi") { day = 4; } - else if (wday == "fri") { day = 5; } - else if (wday == "sat") { day = 6; } - else if (wday == "sun") { day = 0; } - else { assert(false); } - // Create a start time that points to the start of some week. - // A random sunday 00:00:00, which we will use as start of time - struct tm mytime; - memset(&mytime, 0, sizeof(mytime)); - mytime.tm_year = 2008 - 1900; - mytime.tm_mon = 0; - mytime.tm_mday = 1; - mytime.tm_hour = 0; - mytime.tm_min = 0; - mytime.tm_sec = 0; - time_t startTime = timegm(&mytime); - assert(gmtime_r(&startTime, &mytime)); - while (mytime.tm_wday != 0) { - ++mytime.tm_mday; - startTime = timegm(&mytime); - assert(gmtime_r(&startTime, &mytime)); - } - // Add the wanted values to the start time - time_t resultTime = startTime; - resultTime += week * 7 * 24 * 60 * 60 - + day * 24 * 60 * 60 - + hours * 60 * 60 - + minutes * 60 - + seconds; - // std::cerr << "Time requested " << datestring << ". Got time " - // << framework::SecondTime(resultTime).toString() << "\n"; - return resultTime; - } - - void addBucketToDatabase(TestServiceLayerApp& server, - const document::BucketId& id, uint8_t disk, - uint32_t numDocs, uint32_t crc, uint32_t totalSize) - { - bucketdb::StorageBucketInfo info; - info.setBucketInfo(api::BucketInfo(crc, numDocs, totalSize)); - info.disk = disk; - server.getStorageBucketDatabase().insert(id, info, "foo"); - } - - - /** - * In tests wanting to only have one pending, only add buckets for one disk - * as pending is per disk. If so set singleDisk true. - */ - void addBucketsToDatabase(TestServiceLayerApp& server, bool singleDisk) { - addBucketToDatabase(server, document::BucketId(16, 0x123), 0, - 14, 0x123, 1024); - addBucketToDatabase(server, document::BucketId(16, 0x234), 0, - 18, 0x234, 1024); - addBucketToDatabase(server, document::BucketId(16, 0x345), 0, - 11, 0x345, 2048); - addBucketToDatabase(server, document::BucketId(16, 0x456), 0, - 13, 0x456, 1280); - if (!singleDisk) { - addBucketToDatabase(server, document::BucketId(16, 0x567), 1, - 20, 0x567, 4096); - addBucketToDatabase(server, document::BucketId(16, 0x987), 254, - 8, 0x987, 65536); - } - } -} - -#define ASSERT_COMMAND_COUNT(count, dummylink) \ - { \ - std::ostringstream msgost; \ - if ((dummylink).getNumCommands() != count) { \ - for (uint32_t ijx=0; ijx<(dummylink).getNumCommands(); ++ijx) { \ - msgost << (dummylink).getCommand(ijx)->toString(true) << "\n"; \ - } \ - } \ - ASSERT_EQ(size_t(count), (dummylink).getNumCommands()) << msgost.str(); \ - } - -TEST_F(BucketIntegrityCheckerTest, basic_functionality) { - _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 00:00:00")); - addBucketsToDatabase(*_node, false); - DummyStorageLink* dummyLink = 0; - { - std::unique_ptr<BucketIntegrityChecker> midLink( - new BucketIntegrityChecker("", _node->getComponentRegister())); - BucketIntegrityChecker& checker(*midLink); - checker.setMaxThreadWaitTime(framework::MilliSecTime(10)); - // Setup and start checker - DummyStorageLink topLink; - topLink.push_back(StorageLink::UP(midLink.release())); - checker.push_back(std::unique_ptr<StorageLink>( - dummyLink = new DummyStorageLink())); - checker.getSchedulingOptions()._maxPendingCount = 2; - checker.getSchedulingOptions()._minCycleTime = framework::SecondTime(60 * 60); - topLink.open(); - // Waiting for system to be initialized - std::this_thread::sleep_for(10ms); // Give next message chance to come - ASSERT_COMMAND_COUNT(0, *dummyLink); - topLink.doneInit(); - checker.bump(); - // Should have started new run with 2 pending per disk - dummyLink->waitForMessages(4, _timeout); - std::this_thread::sleep_for(10ms); // Give 5th message chance to come - ASSERT_COMMAND_COUNT(4, *dummyLink); - auto* cmd1 = dynamic_cast<RepairBucketCommand*>(dummyLink->getCommand(0).get()); - EXPECT_EQ(230, cmd1->getPriority()); - ASSERT_TRUE(cmd1); - EXPECT_EQ(document::BucketId(16, 0x234), cmd1->getBucketId()); - auto* cmd2 = dynamic_cast<RepairBucketCommand*>(dummyLink->getCommand(1).get()); - ASSERT_TRUE(cmd2); - EXPECT_EQ(document::BucketId(16, 0x456), cmd2->getBucketId()); - auto* cmd3 = dynamic_cast<RepairBucketCommand*>(dummyLink->getCommand(2).get()); - ASSERT_TRUE(cmd3); - EXPECT_EQ(document::BucketId(16, 0x567), cmd3->getBucketId()); - auto* cmd4 = dynamic_cast<RepairBucketCommand*>(dummyLink->getCommand(3).get()); - ASSERT_TRUE(cmd4); - EXPECT_EQ(document::BucketId(16, 0x987), cmd4->getBucketId()); - - // Answering a message on disk with no more buckets does not trigger new - auto reply1 = std::make_shared<RepairBucketReply>(*cmd3); - ASSERT_TRUE(checker.onUp(reply1)); - std::this_thread::sleep_for(10ms); // Give next message chance to come - ASSERT_COMMAND_COUNT(4, *dummyLink); - // Answering a message on disk with more buckets trigger new repair - auto reply2 = std::make_shared<RepairBucketReply>(*cmd2); - ASSERT_TRUE(checker.onUp(reply2)); - dummyLink->waitForMessages(5, _timeout); - std::this_thread::sleep_for(10ms); // Give 6th message chance to come - ASSERT_COMMAND_COUNT(5, *dummyLink); - auto* cmd5 = dynamic_cast<RepairBucketCommand*>(dummyLink->getCommand(4).get()); - ASSERT_TRUE(cmd5); - EXPECT_EQ(document::BucketId(16, 0x345), cmd5->getBucketId()); - // Fail a repair, causing it to be resent later, but first continue - // with other bucket. - auto reply3 = std::make_shared<RepairBucketReply>(*cmd1); - reply3->setResult(api::ReturnCode(api::ReturnCode::IGNORED)); - ASSERT_TRUE(checker.onUp(reply3)); - dummyLink->waitForMessages(6, _timeout); - std::this_thread::sleep_for(10ms); // Give 7th message chance to come - ASSERT_COMMAND_COUNT(6, *dummyLink); - auto* cmd6 = dynamic_cast<RepairBucketCommand*>(dummyLink->getCommand(5).get()); - ASSERT_TRUE(cmd6); - EXPECT_EQ(document::BucketId(16, 0x123), cmd6->getBucketId()); - // Fail a repair with not found. That is an acceptable return code. - // (No more requests as this was last for that disk) - auto reply4 = std::make_shared<RepairBucketReply>(*cmd4); - reply3->setResult(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND)); - ASSERT_TRUE(checker.onUp(reply4)); - std::this_thread::sleep_for(10ms); // Give 7th message chance to come - ASSERT_COMMAND_COUNT(6, *dummyLink); - - // Send a repair reply that actually have corrected the bucket. - api::BucketInfo newInfo(0x3456, 4, 8192); - auto reply5 = std::make_shared<RepairBucketReply>(*cmd5, newInfo); - reply5->setAltered(true); - ASSERT_TRUE(checker.onUp(reply5)); - - // Finish run. New iteration should not start yet as min - // cycle time has not passed - auto reply6 = std::make_shared<RepairBucketReply>(*cmd6); - ASSERT_TRUE(checker.onUp(reply6)); - dummyLink->waitForMessages(7, _timeout); - ASSERT_COMMAND_COUNT(7, *dummyLink); - auto* cmd7 = dynamic_cast<RepairBucketCommand*>(dummyLink->getCommand(6).get()); - ASSERT_TRUE(cmd7); - EXPECT_EQ(document::BucketId(16, 0x234), cmd7->getBucketId()); - auto reply7 = std::make_shared<RepairBucketReply>(*cmd7); - ASSERT_TRUE(checker.onUp(reply7)); - std::this_thread::sleep_for(10ms); // Give 8th message chance to come - ASSERT_COMMAND_COUNT(7, *dummyLink); - - // Still not time for next iteration - dummyLink->reset(); - _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 00:59:59")); - std::this_thread::sleep_for(10ms); // Give new run chance to start - ASSERT_COMMAND_COUNT(0, *dummyLink); - - // Pass time until next cycle should start - dummyLink->reset(); - _node->getClock().setAbsoluteTimeInSeconds(getDate("week1 sun 01:00:00")); - dummyLink->waitForMessages(4, _timeout); - ASSERT_COMMAND_COUNT(4, *dummyLink); - } -} - -} // storage diff --git a/storage/src/vespa/storage/common/bucketmessages.cpp b/storage/src/vespa/storage/common/bucketmessages.cpp index 1a4dc61a3ce..f953428fc8a 100644 --- a/storage/src/vespa/storage/common/bucketmessages.cpp +++ b/storage/src/vespa/storage/common/bucketmessages.cpp @@ -106,70 +106,6 @@ std::unique_ptr<api::StorageReply> ReadBucketInfo::makeReply() { return std::make_unique<ReadBucketInfoReply>(*this); } - -RepairBucketCommand::RepairBucketCommand(const document::Bucket &bucket, uint16_t disk) - : api::InternalCommand(ID), - _bucket(bucket), - _disk(disk), - _verifyBody(false), - _moveToIdealDisk(false) -{ - setPriority(LOW); -} - -RepairBucketCommand::~RepairBucketCommand() = default; - -void -RepairBucketCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { - out << getSummary(); - if (verbose) { - out << " : "; - InternalCommand::print(out, true, indent); - } -} - -void -RepairBucketCommand::setBucketId(const document::BucketId& id) -{ - document::Bucket newBucket(_bucket.getBucketSpace(), id); - _bucket = newBucket; -} - -vespalib::string -RepairBucketCommand::getSummary() const { - vespalib::asciistream s; - s << "ReadBucketInfo(" << _bucket.toString() << ", disk " << _disk - << (_verifyBody ? ", verifying body" : "") - << (_moveToIdealDisk ? ", moving to ideal disk" : "") - << ")"; - return s.str(); -} - -RepairBucketReply::RepairBucketReply(const RepairBucketCommand& cmd, const api::BucketInfo& bucketInfo) - : api::InternalReply(ID, cmd), - _bucket(cmd.getBucket()), - _bucketInfo(bucketInfo), - _disk(cmd.getDisk()), - _altered(false) -{ } - -RepairBucketReply::~RepairBucketReply() = default; - -void -RepairBucketReply::print(std::ostream& out, bool verbose, const std::string& indent) const { - out << "RepairBucketReply()"; - - if (verbose) { - out << " : "; - InternalReply::print(out, true, indent); - } -} - -std::unique_ptr<api::StorageReply> -RepairBucketCommand::makeReply() { - return std::make_unique<RepairBucketReply>(*this); -} - BucketDiskMoveCommand::BucketDiskMoveCommand(const document::Bucket &bucket, uint16_t srcDisk, uint16_t dstDisk) : api::InternalCommand(ID), diff --git a/storage/src/vespa/storage/common/bucketmessages.h b/storage/src/vespa/storage/common/bucketmessages.h index 941928b1064..fed944fb217 100644 --- a/storage/src/vespa/storage/common/bucketmessages.h +++ b/storage/src/vespa/storage/common/bucketmessages.h @@ -114,76 +114,6 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; }; - -/** - * @class RepairBucketCommand - * @ingroup common - * - * @brief Repair a given bucket (if it contain errors). - * - * This message is sent continually by the bucket integrity checker. - * Errors found are reported back. - */ -class RepairBucketCommand : public api::InternalCommand { - document::Bucket _bucket; - uint16_t _disk; - bool _verifyBody; // Optional as it is expensive - bool _moveToIdealDisk; // Optional as it is expensive - -public: - typedef std::unique_ptr<RepairBucketCommand> UP; - - static const uint32_t ID = 2007; - - RepairBucketCommand(const document::Bucket &bucket, uint16_t disk); - ~RepairBucketCommand(); - - bool hasSingleBucketId() const override { return true; } - document::Bucket getBucket() const override { return _bucket; } - - uint16_t getDisk() const { return _disk; } - bool verifyBody() const { return _verifyBody; } - bool moveToIdealDisk() const { return _moveToIdealDisk; } - - void setBucketId(const document::BucketId& id); - void verifyBody(bool doIt) { _verifyBody = doIt; } - void moveToIdealDisk(bool doIt) { _moveToIdealDisk = doIt; } - - std::unique_ptr<api::StorageReply> makeReply() override; - - void print(std::ostream& out, bool verbose, const std::string& indent) const override; -private: - vespalib::string getSummary() const override; -}; - -/** - * @class RepairBucketReply - * @ingroup common - */ -class RepairBucketReply : public api::InternalReply { - document::Bucket _bucket; - api::BucketInfo _bucketInfo; - uint16_t _disk; - bool _altered; - -public: - typedef std::unique_ptr<RepairBucketReply> UP; - static const uint32_t ID = 2008; - - RepairBucketReply(const RepairBucketCommand& cmd, const api::BucketInfo& bucketInfo = api::BucketInfo()); - ~RepairBucketReply(); - document::Bucket getBucket() const override { return _bucket; } - bool hasSingleBucketId() const override { return true; } - - const api::BucketInfo& getBucketInfo() const { return _bucketInfo; } - uint16_t getDisk() const { return _disk; } - - bool bucketAltered() const { return _altered; } - void setAltered(bool altered) { _altered = altered; } - - void print(std::ostream& out, bool verbose, const std::string& indent) const override; -}; - /** * @class BucketDiskMoveCommand * @ingroup common diff --git a/storage/src/vespa/storage/common/messagebucket.cpp b/storage/src/vespa/storage/common/messagebucket.cpp index 4a6f638262d..73c0827c05b 100644 --- a/storage/src/vespa/storage/common/messagebucket.cpp +++ b/storage/src/vespa/storage/common/messagebucket.cpp @@ -64,8 +64,6 @@ getStorageMessageBucket(const api::StorageMessage& msg) return static_cast<const ReadBucketList&>(msg).getBucket(); case ReadBucketInfo::ID: return static_cast<const ReadBucketInfo&>(msg).getBucket(); - case RepairBucketCommand::ID: - return static_cast<const RepairBucketCommand&>(msg).getBucket(); case BucketDiskMoveCommand::ID: return static_cast<const BucketDiskMoveCommand&>(msg).getBucket(); case InternalBucketJoinCommand::ID: diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 5ad37d585ef..e23d65f72ac 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -648,11 +648,6 @@ FileStorHandlerImpl::remapMessage(api::StorageMessage& msg, const document::Buck break; case GetIterCommand::ID: bucket = static_cast<GetIterCommand&>(msg).getBucket(); - [[fallthrough]]; - case RepairBucketCommand::ID: - if (bucket.getBucketId().getRawId() == 0) { - bucket = static_cast<RepairBucketCommand&>(msg).getBucket(); - } // Move to correct queue if op == MOVE // Fail with bucket not found if op != MOVE if (bucket == source) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 6eab44923f7..59ae6b83f56 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -692,15 +692,6 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } return true; } - case RepairBucketCommand::ID: - { - shared_ptr<RepairBucketCommand> cmd(std::static_pointer_cast<RepairBucketCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); - if (entry.exist()) { - handlePersistenceMessage(cmd, entry->disk); - } - return true; - } case BucketDiskMoveCommand::ID: { shared_ptr<BucketDiskMoveCommand> cmd(std::static_pointer_cast<BucketDiskMoveCommand>(msg)); diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index e663aa4a366..1368b14077a 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -186,32 +186,6 @@ PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker) } MessageTracker::UP -PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.repairs); - NotificationGuard notifyGuard(*_bucketOwnershipNotifier); - LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(), - (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); - api::BucketInfo before = _env.getBucketInfo(cmd.getBucket()); - spi::Result result = _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - cmd.verifyBody() ? spi::HIGH : spi::LOW); - if (tracker->checkForError(result)) { - api::BucketInfo after = _env.getBucketInfo(cmd.getBucket()); - - RepairBucketReply::UP reply(new RepairBucketReply(cmd, after)); - reply->setAltered(!(after == before)); - if (reply->bucketAltered()) { - notifyGuard.notifyAlways(cmd.getBucket(), after); - _env._metrics.repairFixed.inc(); - } - - _env.updateBucketDatabase(cmd.getBucket(), after); - tracker->setReply(api::StorageReply::SP(reply.release())); - } - return tracker; -} - -MessageTracker::UP PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker) { tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]); @@ -750,8 +724,6 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra return handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker)); case ReadBucketInfo::ID: return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); - case RepairBucketCommand::ID: - return handleRepairBucket(static_cast<RepairBucketCommand&>(msg), std::move(tracker)); case BucketDiskMoveCommand::ID: return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), std::move(tracker)); case InternalBucketJoinCommand::ID: diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 156c6a0496e..773732b5ef1 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -43,7 +43,6 @@ public: MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker); private: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ed3de5c7873..a86edd359fc 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -161,13 +161,6 @@ ProviderErrorWrapper::getModifiedBuckets(BucketSpace bucketSpace) const } spi::Result -ProviderErrorWrapper::maintain(const spi::Bucket& bucket, - spi::MaintenanceLevel level) -{ - return checkResult(_impl.maintain(bucket, level)); -} - -spi::Result ProviderErrorWrapper::split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 61664419c69..23da566afee 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -59,7 +59,6 @@ public: spi::Result createBucket(const spi::Bucket&, spi::Context&) override; spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; - spi::Result maintain(const spi::Bucket& bucket, spi::MaintenanceLevel level) override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; spi::Result move(const spi::Bucket& source, spi::PartitionId target, spi::Context&) override; diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 1aafe58af6c..606d61ab944 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_library(storage_storageserver SOURCES bouncer.cpp bouncer_metrics.cpp - bucketintegritychecker.cpp changedbucketownershiphandler.cpp communicationmanager.cpp communicationmanagermetrics.cpp diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp b/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp deleted file mode 100644 index 650637f206d..00000000000 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.cpp +++ /dev/null @@ -1,637 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bucketintegritychecker.h" -#include <vespa/storage/common/bucketmessages.h> -#include <vespa/storage/bucketdb/storbucketdb.h> -#include <vespa/storageapi/message/state.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/storage/bucketdb/lockablemap.hpp> -#include <vespa/config/common/exceptions.h> -#include <vespa/vespalib/util/exceptions.h> - -#include <vespa/log/bufferedlogger.h> -LOG_SETUP(".bucketintegritychecker"); - -using std::shared_ptr; -using document::BucketSpace; - -namespace storage { - -namespace { - - std::string printMinutesOfDay(uint32_t minutesOfDay) { - std::ostringstream ost; - uint32_t hours = minutesOfDay / 60; - uint32_t minutes = minutesOfDay % 60; - ost << (hours >= 10 ? hours / 10 : 0) << hours % 10 << ':' - << (minutes >= 10 ? minutes / 10 : 0) << minutes % 10; - return ost.str(); - } - - std::string printRunState(SchedulingOptions::RunState state) { - switch (state) { - case SchedulingOptions::DONT_RUN: - return "Not running"; - case SchedulingOptions::RUN_FULL: - return "Running with full verification"; - case SchedulingOptions::RUN_CHEAP: - return "Running with cheap verification"; - case SchedulingOptions::CONTINUE: - return "Continuing any existing run"; - default: - LOG_ABORT("should not be reached"); - } - } -} - -void -SchedulingOptions::print(std::ostream& out, bool verbose, - const std::string& indent) const -{ - (void) verbose; - std::string ind = indent + " "; - out << "SchedulingOptions(" - << "Daily cycle " << printMinutesOfDay(_dailyCycleStart) - << " - " << printMinutesOfDay(_dailyCycleStop) - << ",\n" << ind << "Weekly cycle" - << "\n" << ind << " Monday - " << printRunState(_dailyStates[1]) - << "\n" << ind << " Tuesday - " << printRunState(_dailyStates[2]) - << "\n" << ind << " Wednesday - " << printRunState(_dailyStates[3]) - << "\n" << ind << " Thursday - " << printRunState(_dailyStates[4]) - << "\n" << ind << " Friday - " << printRunState(_dailyStates[5]) - << "\n" << ind << " Saturday - " << printRunState(_dailyStates[6]) - << "\n" << ind << " Sunday - " << printRunState(_dailyStates[0]) - << ",\n" << ind << "Max pending count " << _maxPendingCount - << ",\n" << ind << "Min cycle time " - << printMinutesOfDay(_minCycleTime.getTime() / 60) - << ",\n" << ind << "Request delay" << _requestDelay << " seconds." - << "\n" << indent << ")"; -} - -bool -BucketIntegrityChecker::DiskData::done() const -{ - return (state == DONE && failedRepairs.empty() && pendingCount == 0); -} - -bool -BucketIntegrityChecker::DiskData::working() const -{ - return (state == IN_PROGRESS || !failedRepairs.empty() - || pendingCount != 0); -} - -// Utilities to find next bucket in bucket list from a possibly non-existing one -namespace { - -struct NextEntryFinder { - bool _first; - uint8_t _disk; - document::BucketId _last; - std::unique_ptr<document::BucketId> _next; - - NextEntryFinder(const document::BucketId& id, uint8_t disk) - : _first(true), _disk(disk), _last(id), _next() {} - - StorBucketDatabase::Decision operator()(document::BucketId::Type bucketId, - StorBucketDatabase::Entry& entry) - { - document::BucketId bucket(document::BucketId::keyToBucketId(bucketId)); - - if (entry.disk != _disk) { - return StorBucketDatabase::CONTINUE; - } else if (_first && bucket == _last) { - _first = false; - return StorBucketDatabase::CONTINUE; - } else { - _next = std::make_unique<document::BucketId>(bucket); - return StorBucketDatabase::ABORT; - } - } -}; - -std::unique_ptr<document::BucketId> getNextId(StorBucketDatabase& database, - const document::BucketId& last, - uint8_t disk) -{ - NextEntryFinder proc(last, disk); - database.each(proc, "BucketIntegrityChecker::getNextId", last.toKey()); - return std::move(proc._next); -} - -bool allBucketSpacesExhausted(size_t index, const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces) noexcept { - return (index == bucketSpaces.size() - 1); -} - -} // End of anonymous namespace - -document::Bucket -BucketIntegrityChecker::DiskData::iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces, - const ContentBucketSpaceRepo& bucketSpaceRepo) -{ - static uint32_t i=0; - // Resend failed buckets once in a while - if (!failedRepairs.empty() && ++i % 10 == 9) { - document::Bucket bucket(failedRepairs.front()); - LOG(spam, "Scheduling next bucket %s from failed repairs list", - bucket.toString().c_str()); - failedRepairs.pop_front(); - ++retriedBuckets; - return bucket; - } - if (state == NOT_STARTED) { - // Guaranteed to be before all buckets. - currentBucket = document::BucketId(0, 0); - currentBucketSpaceIndex = 0; - } - while (state != DONE) { - const auto currentSpace = bucketSpaces[currentBucketSpaceIndex]; - const auto bid = getNextId(bucketSpaceRepo.get(currentSpace).bucketDatabase(), currentBucket, disk); - if (bid) { - state = IN_PROGRESS; - currentBucket = *bid; - return document::Bucket(currentSpace, currentBucket); - } else if (allBucketSpacesExhausted(currentBucketSpaceIndex, bucketSpaces)) { - state = DONE; - break; - } else { - ++currentBucketSpaceIndex; - currentBucket = document::BucketId(0, 0); - } - } - // If we didn't schedule repaired, but we ended up not having any other, - // take repaired once anyways - if (!failedRepairs.empty()) { - document::Bucket bucket(failedRepairs.front()); - LOG(spam, "Done iterating, scheduling next bucket %s from failed " - "repairs list", bucket.toString().c_str()); - failedRepairs.pop_front(); - ++retriedBuckets; - return bucket; - } - return document::Bucket(bucketSpaces[currentBucketSpaceIndex], document::BucketId(0, 0)); -} - -BucketIntegrityChecker::BucketIntegrityChecker( - const config::ConfigUri & configUri, - ServiceLayerComponentRegister& compReg) - : StorageLinkQueued("Bucket integrity checker", compReg), - Runnable(), - framework::HtmlStatusReporter("bucketintegritychecker", - "Bucket integrity checker"), - _component(compReg, "bucketintegritychecker"), - _cycleCount(0), - _bucketSpaces(_component.getBucketSpaceRepo().getBucketSpaces()), - _status(), - _lastCycleStart(0), - _cycleStartBucketCount(0), - _lastResponseTime(0), - _lastCycleCompleted(true), - _currentRunWithFullVerification(false), - _verifyAllRepairs(false), - _scheduleOptions(), - _wait(), - _configFetcher(configUri.getContext()), - _maxThreadWaitTime(60 * 1000) -{ - assert(!_bucketSpaces.empty()); - LOG(debug, "Configuring bucket integrity checker to work with %u disks.", - _component.getDiskCount()); - _status.resize(_component.getDiskCount()); - for (uint16_t i=0; i<_component.getDiskCount(); ++i) { - _status[i].disk = i; - } - if (_status.empty()) { - throw vespalib::IllegalStateException( - "Cannot have storage with no disks.", VESPA_STRLOC); - } - // Register for config. Normally not critical, so catching config - // exception allowing program to continue if missing/faulty config. - try{ - if (!configUri.empty()) { - _configFetcher.subscribe<vespa::config::content::core::StorIntegritycheckerConfig>(configUri.getConfigId(), this); - _configFetcher.start(); - } else { - LOG(info, "No config id specified. Using defaults rather than " - "config"); - } - } catch (config::InvalidConfigException& e) { - LOG(info, "Bucket Integrity Checker failed to load config '%s'. This " - "is not critical since it has sensible defaults: %s", - configUri.getConfigId().c_str(), e.what()); - } - _component.registerStatusPage(*this); -} - -BucketIntegrityChecker::~BucketIntegrityChecker() -{ - // This can happen during unit testing - if (StorageLink::getState() == StorageLink::OPENED) { - LOG(error, "BucketIntegrityChecker deleted without calling close() " - "first"); - close(); - flush(); - } - closeNextLink(); -} - -void -BucketIntegrityChecker::onClose() -{ - // Avoid getting config during shutdown - _configFetcher.close(); - // Close thread to ensure we don't send anything more down after - if (_thread) { - LOG(debug, "Waiting for bucket integrity worker thread to close."); - _thread->interruptAndJoin(&_wait); - LOG(debug, "Bucket integrity worker thread closed."); - } - StorageLinkQueued::onClose(); -} - -void BucketIntegrityChecker::bump() const { - vespalib::MonitorGuard monitor(_wait); - monitor.signal(); -} - -bool BucketIntegrityChecker::isWorkingOnCycle() const { - vespalib::MonitorGuard monitor(_wait); - for (uint32_t i=0; i<_status.size(); ++i) { - if (_status[i].working()) return true; - } - return (!_lastCycleCompleted); -} - -uint32_t BucketIntegrityChecker::getCycleCount() const { - vespalib::MonitorGuard monitor(_wait); - return _cycleCount; -} - -void -BucketIntegrityChecker::print(std::ostream& out, bool verbose, - const std::string& indent) const -{ - (void) verbose; (void) indent; - out << "BucketIntegrityChecker"; -} - -void -BucketIntegrityChecker::configure( - std::unique_ptr<vespa::config::content::core::StorIntegritycheckerConfig> config) -{ - SchedulingOptions options; - options._dailyCycleStart = config->dailycyclestart; - options._dailyCycleStop = config->dailycyclestop; - options._maxPendingCount = config->maxpending; - options._minCycleTime = framework::SecondTime(60 * config->mincycletime); - options._requestDelay = framework::SecondTime(config->requestdelay); - std::string states = config->weeklycycle; - if (states.size() != 7) { - LOG(warning, "Not using integritychecker config: weeklycycle must " - "contain 7 characters, one for each week. Retrieved value:" - " '%s'.", states.c_str()); - return; - } - for (uint32_t i=0; i<7; ++i) { - switch (states[i]) { - case 'R': - options._dailyStates[i] = SchedulingOptions::RUN_FULL; break; - case 'r': - options._dailyStates[i] = SchedulingOptions::RUN_CHEAP; break; - case 'c': - options._dailyStates[i] = SchedulingOptions::CONTINUE; break; - case '-': - options._dailyStates[i] = SchedulingOptions::DONT_RUN; break; default: - LOG(warning, "Not using integritychecker config: weeklycycle " - "contained illegal character %c.", states[i]); - return; - } - } - if (options._dailyCycleStart >= 24*60) { - LOG(warning, "Not using integritychecker config: dailycyclestart " - "is minutes since midnight and must be less than %u. " - "%u is out of range.", 24*60, options._dailyCycleStart); - return; - } - if (options._dailyCycleStop >= 24*60) { - LOG(warning, "Not using integritychecker config: dailycyclestart " - "is minutes since midnight and must be less than %u. " - "%u is out of range.", 24*60, options._dailyCycleStart); - return; - } - if (options._maxPendingCount > 1024) { - LOG(warning, "integritychecker config: Values above 1024 not " - "accepted. Got %u.", options._maxPendingCount); - return; - } - if (options._requestDelay > framework::SecondTime(60*60)) { - LOG(warning, "With a %" PRIu64 " second delay between each bucket " - "verification actually finishing a cycle will take a very " - "long time.", - options._requestDelay.getTime()); - } - vespalib::MonitorGuard monitor(_wait); - if (options._minCycleTime.getMillis() < _maxThreadWaitTime) { - _maxThreadWaitTime = framework::MilliSecTime(1000); - monitor.signal(); - } else { - _maxThreadWaitTime = framework::MilliSecTime(60 * 1000); - } - _scheduleOptions = options; -} - -void BucketIntegrityChecker::onDoneInit() -{ - framework::MilliSecTime maxProcessingTime(60 * 1000); - _thread = _component.startThread( - *this, maxProcessingTime, _maxThreadWaitTime); -} - -bool -BucketIntegrityChecker::onInternalReply( - const std::shared_ptr<api::InternalReply>& internalReply) -{ - // We only care about repair bucket replies - auto reply = std::dynamic_pointer_cast<RepairBucketReply>(internalReply); - if (!reply) { - return false; - } - - vespalib::MonitorGuard monitor(_wait); - _lastResponseTime = _component.getClock().getTimeInSeconds(); - uint8_t disk = reply->getDisk(); - assert(disk < _status.size()); - --_status[disk].pendingCount; - LOG(spam, "Got repair reply for bucket %s: %s. %u messages still pending " - "for disk %u. Bucket altered ? %s", - reply->getBucket().toString().c_str(), - reply->getResult().toString().c_str(), - _status[disk].pendingCount, disk, - (reply->bucketAltered() ? "true" : "false")); - if (reply->getResult().success()) { - LOG(spam, "Repaired handled ok"); - ++_status[disk].checkedBuckets; - if (_status[disk].done()) { - bool completed = true; - for (uint32_t i=0; i<_status.size(); ++i) { - if (!_status[i].done()) { - completed = false; - break; - } - } - _lastCycleCompleted = completed; - } - } else if (reply->getResult().isNonCriticalForIntegrityChecker()) { - ++_status[disk].checkedBuckets; - LOGBP(debug, "Failed to repair bucket %s due to aborting request. " - "Likely bucket split/join or storage shutting down: %s", - reply->getBucket().toString().c_str(), - reply->getResult().toString().c_str()); - } else { - _status[disk].failedRepairs.push_back(reply->getBucket()); - LOGBP(warning, "Failed to perform maintenance on bucket %s, " - "scheduled to be retried: %s", - reply->getBucket().toString().c_str(), - reply->getResult().toString().c_str()); - } - if (_lastCycleCompleted) { - LOG(info, "Completed bucket integrity check cycle"); - } - monitor.signal(); - return true; -} - -SchedulingOptions::RunState -BucketIntegrityChecker::getCurrentRunState( - framework::SecondTime currentTime) const -{ - time_t currTime = currentTime.getTime(); - struct tm date; - struct tm* dateptr = ::gmtime_r(&currTime, &date); - assert(dateptr); - (void) dateptr; - // Get initial state based on weekday - SchedulingOptions::RunState state( - _scheduleOptions._dailyStates[date.tm_wday]); - uint32_t minutesOfDay = 60 * date.tm_hour + date.tm_min; - if (( - _scheduleOptions._dailyCycleStart < _scheduleOptions._dailyCycleStop - && _scheduleOptions._dailyCycleStart <= minutesOfDay - && _scheduleOptions._dailyCycleStop > minutesOfDay - ) || ( - _scheduleOptions._dailyCycleStart >= _scheduleOptions._dailyCycleStop - && (_scheduleOptions._dailyCycleStart <= minutesOfDay - || _scheduleOptions._dailyCycleStop > minutesOfDay) - ) - ) - { // If we're within region in day that we can run. - if (state == SchedulingOptions::CONTINUE) { - // If we're in a continue state, set runstate if there's a current - // run active that isn't completed yet, don't run otherwise. - state = (_lastCycleCompleted - ? SchedulingOptions::DONT_RUN - : (_currentRunWithFullVerification - ? SchedulingOptions::RUN_FULL - : SchedulingOptions::RUN_CHEAP)); - } else if (state == SchedulingOptions::RUN_FULL || - state == SchedulingOptions::RUN_CHEAP) { - // If we're not currently in a run, and it's less than min cycle - // time since last run started, we might not want to run yet. - if (_lastCycleCompleted && - currentTime - _lastCycleStart < _scheduleOptions._minCycleTime) - { - // Unless we didn't do full verification last and want to - // do full verification now, delay run. - if (_currentRunWithFullVerification || - state == SchedulingOptions::RUN_CHEAP) - { - state = SchedulingOptions::DONT_RUN; - } else { - } - } - } - } else { - // If we're outside of time of day boundaries, don't run - state = SchedulingOptions::DONT_RUN; - } - return state; -} - -void -BucketIntegrityChecker::run(framework::ThreadHandle& thread) -{ - while (!thread.interrupted()) { - thread.registerTick(framework::PROCESS_CYCLE); - // Get the state based on the current time. - framework::SecondTime currentTime( - _component.getClock().getTimeInSeconds()); - - vespalib::MonitorGuard monitor(_wait); - SchedulingOptions::RunState state = getCurrentRunState(currentTime); - if (state != SchedulingOptions::RUN_FULL && - state != SchedulingOptions::RUN_CHEAP) - { - // If we dont want to run at this hour, wait. - LOG(spam, "Not in a run state. Waiting."); - monitor.wait(_maxThreadWaitTime.getTime()); - thread.registerTick(framework::WAIT_CYCLE); - } else if (state == SchedulingOptions::RUN_FULL && !_lastCycleCompleted - && !_currentRunWithFullVerification) - { - if (getTotalPendingCount() > 0) { - LOG(spam, "Waiting for last run to get pending to 0, before " - "restarting run to get full verification."); - monitor.wait(_maxThreadWaitTime.getTime()); - thread.registerTick(framework::WAIT_CYCLE); - } else { - LOG(info, "Aborting current verification/repair cycle and " - "starting new one as we at this time want full " - "verification."); - for (uint32_t i=0; i<_status.size(); ++i) { - _status[i].state = DiskData::DONE; - } - _lastCycleCompleted = true; - } - } else if (_scheduleOptions._requestDelay.isSet() - && getTotalPendingCount() > 0) - { - LOG(spam, "Request delay. Waiting for 0 pending before possibly " - "sending new."); - // If request delay is used, we don't send anything new before - // all requests have been received. - monitor.wait(_maxThreadWaitTime.getTime()); - thread.registerTick(framework::WAIT_CYCLE); - } else if (_scheduleOptions._requestDelay.isSet() && - currentTime - _lastResponseTime - < _scheduleOptions._requestDelay) - { - LOG(spam, "Request delay. Waiting given seconds before sending " - "next."); - // If request delay is used and we haven't waited enough, wait more - framework::MilliSecTime delay( - (_scheduleOptions._requestDelay - - (currentTime - _lastResponseTime)).getMillis()); - if (delay > _maxThreadWaitTime) delay = _maxThreadWaitTime; - monitor.wait(std::min(_maxThreadWaitTime.getTime(), delay.getTime())); - thread.registerTick(framework::WAIT_CYCLE); - } else if (_lastCycleCompleted && getTotalPendingCount() > 0) { - LOG(spam, "Completed last cycle. Waiting until we have 0 pending " - "before possibly starting new cycle"); - monitor.wait(_maxThreadWaitTime.getTime()); - thread.registerTick(framework::WAIT_CYCLE); - } else { - LOG(spam, "Sending messages if we have less than max pending. " - "(Currently %u pending total, max is %u per disk)", - getTotalPendingCount(), - _scheduleOptions._maxPendingCount); - // Else we send up to max pending and wait for responses. - if (_lastCycleCompleted) { - for (auto& disk : _status) { - disk.state = DiskData::NOT_STARTED; - disk.failedRepairs.clear(); - disk.checkedBuckets = 0; - disk.retriedBuckets = 0; - } - LOG(info, "Starting new verification/repair cycle at time %s.", - currentTime.toString().c_str()); - _lastCycleStart = currentTime; - _cycleStartBucketCount = 0; - for (auto space : _bucketSpaces) { - _cycleStartBucketCount += _component.getBucketDatabase(space).size(); - } - _lastCycleCompleted = false; - _currentRunWithFullVerification = (state == SchedulingOptions::RUN_FULL); - ++_cycleCount; - } - for (auto& disk : _status) { - while (disk.pendingCount < _scheduleOptions._maxPendingCount) { - auto bucket = disk.iterate(_bucketSpaces, _component.getBucketSpaceRepo()); - if (bucket.getBucketId() == document::BucketId(0, 0)) { - LOG(debug, "Completed repair cycle for disk %u.", disk.disk); - // If there is no next bucket, we might have completed run - bool completed = true; - for (uint32_t j=0; j<_status.size(); ++j) { - if (!_status[j].done()) { - completed = false; - break; - } - } - _lastCycleCompleted = completed; - if (_lastCycleCompleted) { - LOG(debug, "Repair cycle completed for all disks."); - } - break; - } - auto cmd = std::make_shared<RepairBucketCommand>(bucket, disk.disk); - cmd->verifyBody(_currentRunWithFullVerification); - cmd->moveToIdealDisk(true); - cmd->setPriority(230); - LOG(spam, "Sending new repair command for bucket %s. " - "After this, there will be %u pending on disk %u", - bucket.toString().c_str(), - disk.pendingCount + 1, disk.disk); - ++disk.pendingCount; - dispatchDown(cmd); - } - } - monitor.wait(_maxThreadWaitTime.getTime()); - thread.registerTick(framework::WAIT_CYCLE); - } - } -} - -uint32_t -BucketIntegrityChecker::getTotalPendingCount() const -{ - uint32_t total = 0; - for (auto& disk : _status) { - total += disk.pendingCount; - } - return total; -} - -namespace { - template<typename T> - void printRow(std::ostream& out, const std::string& key, const T& val) { - out << "<tr><td>" << key << "</td><td><pre>" << val - << "</pre></td></tr>\n"; - } -} - -void -BucketIntegrityChecker::reportHtmlStatus(std::ostream& out, - const framework::HttpUrlPath&) const -{ - vespalib::MonitorGuard monitor(_wait); - uint32_t totalChecked = 0, totalRetried = 0; - for (uint32_t i=0; i<_status.size(); ++i) { - totalChecked += _status[i].checkedBuckets; - totalRetried += _status[i].retriedBuckets; - } - out << "<table>\n"; - printRow(out, "current status", _lastCycleCompleted - ? "Not running a cycle" : "Running a cycle"); - printRow(out, "pending count", getTotalPendingCount()); - std::string name = (_lastCycleCompleted ? "last" : "current"); - if (_lastCycleStart.isSet()) { - printRow(out, name + " cycle start", _lastCycleStart.toString()); - printRow(out, "buckets checked in " + name + " cycle", - totalChecked); - printRow(out, "buckets retried check in " + name + " cycle", - totalRetried); - printRow(out, "total buckets in database at start of " + name - + " cycle", _cycleStartBucketCount); - if (!_lastCycleCompleted) { - std::ostringstream ost; - ost << (100.0 * totalChecked / _cycleStartBucketCount) << " %"; - printRow(out, "progress", ost.str()); - } - } - if (_lastResponseTime.isSet()) { - printRow(out, "Last response time", _lastResponseTime.toString()); - } - printRow(out, "Schedule options", _scheduleOptions); - out << "</table>\n"; -} - -} // storage diff --git a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h b/storage/src/vespa/storage/storageserver/bucketintegritychecker.h deleted file mode 100644 index fb619a84c46..00000000000 --- a/storage/src/vespa/storage/storageserver/bucketintegritychecker.h +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class storage::BucketIntegrityChecker - * @ingroup storageserver - * - * @brief This class schedules buckets for integrity checks. - * - * @version $Id$ - */ - -#pragma once - -#include <vespa/storage/common/content_bucket_space_repo.h> -#include <vespa/storage/common/servicelayercomponent.h> -#include <vespa/storage/common/storagelinkqueued.h> -#include <vespa/storage/config/config-stor-integritychecker.h> -#include <vespa/storageapi/message/bucket.h> -#include <vespa/storageframework/generic/status/htmlstatusreporter.h> -#include <vespa/config/config.h> -#include <list> - -namespace storage { - -class RepairBucketReply; - -/** Options describing when and how parallel we should run. */ -struct SchedulingOptions : public document::Printable { - /** Time of day to start/resume cycle. Minutes after 00:00. 0 - 24*60-1. */ - uint32_t _dailyCycleStart; - /** Time of day to pause cycle if it's still going. Minutes after 00:00. */ - uint32_t _dailyCycleStop; - - enum RunState { DONT_RUN, RUN_FULL, RUN_CHEAP, CONTINUE }; - /** Which days to run cycle. */ - RunState _dailyStates[7]; - - /** Max pending requests at the same time. */ - uint32_t _maxPendingCount; - /** Minimum time between each cycle. */ - framework::SecondTime _minCycleTime; - /** Seconds delay between requests if max pending == 1. */ - framework::SecondTime _requestDelay; - - SchedulingOptions() - : _dailyCycleStart(0), - _dailyCycleStop(0), - _maxPendingCount(5), - _minCycleTime(24 * 60 * 60), // One day - _requestDelay(0) - { - for (uint32_t i=0; i<7; ++i) { _dailyStates[i] = RUN_FULL; } - } - - void print(std::ostream& out, bool verbose, const std::string& indent) const override; -}; - - -class BucketIntegrityChecker : public StorageLinkQueued, - private framework::Runnable, - public framework::HtmlStatusReporter, - private config::IFetcherCallback<vespa::config::content::core::StorIntegritycheckerConfig> { -public: - struct DiskData { - /** - * State of bucket database iterating. If not started, we should - * take first bucket in bucket db, if in progress, take next after - * currentBucket, and if done, don't do anymore. - */ - enum State { NOT_STARTED, IN_PROGRESS, DONE }; - - size_t currentBucketSpaceIndex; - document::BucketId currentBucket; - uint32_t pendingCount; - State state; - uint8_t disk; - std::list<document::Bucket> failedRepairs; - uint32_t checkedBuckets; - uint32_t retriedBuckets; - - DiskData() - : currentBucketSpaceIndex(0), currentBucket(0), - pendingCount(0), state(NOT_STARTED), disk(255), - checkedBuckets(0), retriedBuckets(0) - {} - - bool done() const; // Whether we're still working on this disk - bool working() const; // Whether we've stated and not finished - /** - * Get the next bucket to repair. If no more to iterate, random bucket - * is returned. Check if done() afterwards. - */ - document::Bucket iterate(const ContentBucketSpaceRepo::BucketSpaces& bucketSpaces, - const ContentBucketSpaceRepo& bucketSpaceRepo); - }; - -private: - ServiceLayerComponent _component; - uint32_t _cycleCount; - ContentBucketSpaceRepo::BucketSpaces _bucketSpaces; - std::vector<DiskData> _status; - framework::SecondTime _lastCycleStart; - uint32_t _cycleStartBucketCount; - framework::SecondTime _lastResponseTime; - bool _lastCycleCompleted; - bool _currentRunWithFullVerification; - bool _verifyAllRepairs; - SchedulingOptions _scheduleOptions; - vespalib::Monitor _wait; - config::ConfigFetcher _configFetcher; - framework::MilliSecTime _maxThreadWaitTime; - framework::Thread::UP _thread; - - BucketIntegrityChecker(const BucketIntegrityChecker &); - BucketIntegrityChecker& operator=(const BucketIntegrityChecker &); - -public: - BucketIntegrityChecker(const config::ConfigUri & configUri, - ServiceLayerComponentRegister&); - ~BucketIntegrityChecker(); - - void onClose() override; - void print(std::ostream& out, bool verbose, const std::string& indent) const override; - SchedulingOptions& getSchedulingOptions() { return _scheduleOptions; } - bool isWorkingOnCycle() const; - uint32_t getCycleCount() const; - - /** Give thread a bump by signalling it. */ - void bump() const; - - void setMaxThreadWaitTime(framework::MilliSecTime milliSecs) { _maxThreadWaitTime = milliSecs; } - - framework::Clock& getClock() { return _component.getClock(); } - -private: - void configure(std::unique_ptr<vespa::config::content::core::StorIntegritycheckerConfig>) override; - void onDoneInit() override; - bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override; - bool onNotifyBucketChangeReply(const std::shared_ptr<api::NotifyBucketChangeReply>&) override { return true; } - SchedulingOptions::RunState getCurrentRunState(framework::SecondTime time) const; - void run(framework::ThreadHandle&) override; - uint32_t getTotalPendingCount() const; - void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; -}; - -} diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 6606b24288e..524e7c19d53 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -2,7 +2,6 @@ #include "servicelayernode.h" #include "bouncer.h" -#include "bucketintegritychecker.h" #include "communicationmanager.h" #include "changedbucketownershiphandler.h" #include "mergethrottler.h" @@ -242,7 +241,6 @@ ServiceLayerNode::createChain() auto* merge_throttler = new MergeThrottler(_configUri, compReg); chain->push_back(StorageLink::UP(merge_throttler)); chain->push_back(StorageLink::UP(new ChangedBucketOwnershipHandler(_configUri, compReg))); - chain->push_back(StorageLink::UP(new BucketIntegrityChecker(_configUri, compReg))); chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg))); chain->push_back(StorageLink::UP(new StorageBucketDBInitializer( _configUri, _partitions, getDoneInitializeHandler(), compReg))); |