diff options
7 files changed, 1 insertions, 1672 deletions
diff --git a/storage/src/tests/bucketdb/CMakeLists.txt b/storage/src/tests/bucketdb/CMakeLists.txt index a182fed02ea..2acaea1940d 100644 --- a/storage/src/tests/bucketdb/CMakeLists.txt +++ b/storage/src/tests/bucketdb/CMakeLists.txt @@ -5,7 +5,6 @@ vespa_add_executable(storage_bucketdb_gtest_runner_app TEST bucketinfotest.cpp bucketmanagertest.cpp gtest_runner.cpp - initializertest.cpp judyarraytest.cpp judymultimaptest.cpp lockablemaptest.cpp diff --git a/storage/src/tests/bucketdb/initializertest.cpp b/storage/src/tests/bucketdb/initializertest.cpp deleted file mode 100644 index a39be7910f1..00000000000 --- a/storage/src/tests/bucketdb/initializertest.cpp +++ /dev/null @@ -1,596 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * Tests storage initialization without depending on persistence layer. - */ -#include <vespa/document/base/testdocman.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/bucketdb/lockablemap.hpp> -#include <vespa/storage/bucketdb/storagebucketdbinitializer.h> -#include <vespa/storage/persistence/filestorage/filestormanager.h> -#include <vespa/storageapi/message/bucket.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/message/state.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/dummystoragelink.h> -#include <tests/common/testhelper.h> -#include <vespa/vdstestlib/config/dirconfig.hpp> -#include <vespa/vespalib/gtest/gtest.h> - -#include <vespa/log/log.h> -LOG_SETUP(".test.bucketdb.initializing"); - -using document::FixedBucketSpaces; -using namespace ::testing; - -namespace storage { - -typedef uint16_t PartitionId; - -struct InitializerTest : public Test { - - class InitParams { - vdstestlib::DirConfig config; - bool configFinalized; - - public: - uint32_t bucketBitsUsed; - NodeIndex nodeIndex; - NodeCount nodeCount; - Redundancy redundancy; - uint32_t docsPerDisk; - DiskCount diskCount; - bool bucketWrongDisk; - bool bucketMultipleDisks; - bool failingListRequest; - bool failingInfoRequest; - - InitParams() - : config(getStandardConfig(true)), - configFinalized(false), - bucketBitsUsed(4), - nodeIndex(0), - nodeCount(10), - redundancy(2), - docsPerDisk(10), - diskCount(1), - bucketWrongDisk(false), - bucketMultipleDisks(false), - failingListRequest(false), - failingInfoRequest(false) - {} - - vdstestlib::DirConfig& getConfig() { - if (!configFinalized) { - config.getConfig("stor-server") - .setValue("node_index", nodeIndex); - config.getConfig("stor-distribution") - .setValue("redundancy", redundancy); - configFinalized = true; - } - return config; - } - - }; - - document::TestDocMan _docMan; - - void do_test_initialization(InitParams& params); -}; - -TEST_F(InitializerTest, init_with_empty_node) { - InitParams params; - params.docsPerDisk = 0; - do_test_initialization(params); -} - -TEST_F(InitializerTest, init_with_data_on_single_disk) { - InitParams params; - params.diskCount = DiskCount(1); - do_test_initialization(params); -} - -TEST_F(InitializerTest, init_with_multiple_disks) { - InitParams params; - do_test_initialization(params); -} - -TEST_F(InitializerTest, init_with_bucket_on_wrong_disk) { - InitParams params; - params.bucketWrongDisk = true; - params.bucketBitsUsed = 58; - do_test_initialization(params); -} - -namespace { -// Data kept on buckets we're using in test. -struct BucketData { - api::BucketInfo info; - - BucketData() : info(0, 0, 0, 0, 0) { - } - - BucketData operator+(const BucketData& other) const { - BucketData copy; - copy.info.setDocumentCount( - info.getDocumentCount() + other.info.getDocumentCount()); - copy.info.setTotalDocumentSize( - info.getTotalDocumentSize() - + other.info.getTotalDocumentSize()); - copy.info.setChecksum( - info.getChecksum() * other.info.getChecksum()); - return copy; - } -}; -// Data residing on one disk -typedef std::map<document::BucketId, BucketData> DiskData; -struct BucketInfoLogger { - std::map<PartitionId, DiskData>& map; - - explicit BucketInfoLogger(std::map<PartitionId, DiskData>& m) - : map(m) {} - - StorBucketDatabase::Decision operator()( - uint64_t revBucket, const StorBucketDatabase::Entry& entry) - { - document::BucketId bucket(document::BucketId::keyToBucketId(revBucket)); - assert(bucket.getRawId() != 0); - assert(entry.getBucketInfo().valid()); - DiskData& ddata(map[0]); - BucketData& bdata(ddata[bucket]); - bdata.info = entry.getBucketInfo(); - return StorBucketDatabase::Decision::CONTINUE; - } -}; -std::map<PartitionId, DiskData> -createMapFromBucketDatabase(StorBucketDatabase& db) { - std::map<PartitionId, DiskData> result; - BucketInfoLogger infoLogger(result); - db.acquire_read_guard()->for_each(std::ref(infoLogger)); - return result; -} -// Create data we want to have in this test -std::map<PartitionId, DiskData> -buildBucketInfo(const document::TestDocMan& docMan, - InitializerTest::InitParams& params) -{ - std::map<PartitionId, DiskData> result; - for (uint32_t i=0; i<params.diskCount; ++i) { - result[i]; - } - lib::Distribution distribution( - lib::Distribution::getDefaultDistributionConfig( - params.redundancy, params.nodeCount)); - document::BucketIdFactory bucketIdFactory; - lib::NodeState nodeState; - nodeState.setDiskCount(params.diskCount); - - uint64_t totalDocs = params.docsPerDisk * params.diskCount; - for (uint32_t i=0, n=totalDocs; i<n; ++i) { - bool useWrongDisk = false; - if (i == 1 && params.bucketWrongDisk) { - useWrongDisk = true; - } - document::Document::SP doc(docMan.createRandomDocument(i)); - if (i == 3 && params.bucketMultipleDisks) { - doc = docMan.createRandomDocument(i - 1); - useWrongDisk = true; - } - document::BucketId bid(bucketIdFactory.getBucketId(doc->getId())); - bid.setUsedBits(params.bucketBitsUsed); - bid = bid.stripUnused(); - uint32_t partition(distribution.getIdealDisk( - nodeState, params.nodeIndex, bid, - lib::Distribution::IDEAL_DISK_EVEN_IF_DOWN)); - if (useWrongDisk) { - int correctPart = partition; - partition = (partition + 1) % params.diskCount;; - LOG(debug, "Putting bucket %s on wrong disk %u instead of %u", - bid.toString().c_str(), partition, correctPart); - } - LOG(debug, "Putting bucket %s on disk %u", - bid.toString().c_str(), partition); - BucketData& data(result[partition][bid]); - data.info.setDocumentCount(data.info.getDocumentCount() + 1); - data.info.setTotalDocumentSize( - data.info.getTotalDocumentSize() + 100); - data.info.setChecksum(data.info.getChecksum() * 3); - } - return result; -} -void verifyEqual(std::map<PartitionId, DiskData>& org, - std::map<PartitionId, DiskData>& existing) -{ - uint32_t equalCount = 0; - std::map<PartitionId, DiskData>::const_iterator part1(org.begin()); - std::map<PartitionId, DiskData>::const_iterator part2(existing.begin()); - while (part1 != org.end() && part2 != existing.end()) { - if (part1->first < part2->first) { - if (!part1->second.empty()) { - FAIL() << "No data in partition " << part1->first << " found."; - } - ++part1; - } else if (part1->first > part2->first) { - if (!part2->second.empty()) { - FAIL() << "Found data in partition " << part2->first - << " which should not exist."; - } - ++part2; - } else { - auto bucket1 = part1->second.begin(); - auto bucket2 = part2->second.begin(); - while (bucket1 != part1->second.end() - && bucket2 != part2->second.end()) - { - if (bucket1->first < bucket2->first) { - FAIL() << "No data in partition " << part1->first - << " for bucket " << bucket1->first << " found."; - } else if (bucket1->first.getId() > bucket2->first.getId()) - { - FAIL() << "Found data in partition " << part2->first - << " for bucket " << bucket2->first - << " which should not exist."; - } else if (!(bucket1->second.info == bucket2->second.info)) { - FAIL() << "Bucket " << bucket1->first << " on partition " - << part1->first << " has bucket info " - << bucket2->second.info << " and not " - << bucket1->second.info << " as expected."; - } - ++bucket1; - ++bucket2; - ++equalCount; - } - if (bucket1 != part1->second.end()) { - FAIL() << "No data in partition " << part1->first - << " for bucket " << bucket1->first << " found."; - } - if (bucket2 != part2->second.end()) { - FAIL() << "Found data in partition " << part2->first - << " for bucket " << bucket2->first - << " which should not exist."; - } - ++part1; - ++part2; - } - } - if (part1 != org.end() && !part1->second.empty()) { - FAIL() << "No data in partition " << part1->first << " found."; - } - if (part2 != existing.end() && !part2->second.empty()) { - FAIL() << "Found data in partition " << part2->first - << " which should not exist."; - } -} - -struct MessageCallback -{ -public: - virtual ~MessageCallback() = default; - virtual void onMessage(const api::StorageMessage&) = 0; -}; - -struct FakePersistenceLayer : public StorageLink { - StorBucketDatabase& bucketDatabase; - std::map<PartitionId, DiskData>& data; - std::string firstFatal; - std::string fatalError; - MessageCallback* messageCallback; - - FakePersistenceLayer(std::map<PartitionId, DiskData>& d, - StorBucketDatabase& db) - : StorageLink("fakepersistencelayer"), - bucketDatabase(db), - data(d), - messageCallback(0) - { - } - - void fatal(vespalib::stringref error) { - fatalError = error; - if (firstFatal.empty()) firstFatal = fatalError; - } - const BucketData* getBucketData(PartitionId partition, - const document::BucketId& bucket, - vespalib::stringref opname) - { - std::map<PartitionId, DiskData>::const_iterator it( - data.find(partition)); - if (it == data.end()) { - std::ostringstream ost; - ost << bucket << " is stated to be on partition " << partition - << " in operation " << opname << ", but we have no data for " - << "it there."; - fatal(ost.str()); - } else { - auto it2 = it->second.find(bucket); - if (it2 == it->second.end()) { - std::ostringstream ost; - ost << "Have no data for " << bucket << " on disk " << partition - << " in operation " << opname; - fatal(ost.str()); - } else { - const BucketData& bucketData(it2->second); - return &bucketData; - } - } - return 0; - } - - bool onDown(const api::StorageMessage::SP& msg) override { - fatalError = ""; - if (messageCallback) { - messageCallback->onMessage(*msg); - } - if (msg->getType() == api::MessageType::INTERNAL) { - auto& cmd = dynamic_cast<api::InternalCommand&>(*msg); - if (cmd.getType() == ReadBucketList::ID) { - auto& rbl = dynamic_cast<ReadBucketList&>(cmd); - ReadBucketListReply::SP reply(new ReadBucketListReply(rbl)); - std::map<PartitionId, DiskData>::const_iterator it( - data.find(rbl.getPartition())); - if (it == data.end()) { - std::ostringstream ost; - ost << "Got list request to partition " - << rbl.getPartition() - << " for which we should not get a request"; - fatal(ost.str()); - } else { - if (cmd.getBucket().getBucketSpace() == FixedBucketSpaces::default_space()) { - for (const auto& bd : it->second) { - reply->getBuckets().push_back(bd.first); - } - } - } - if (!fatalError.empty()) { - reply->setResult(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, fatalError)); - } - sendUp(reply); - } else if (cmd.getType() == ReadBucketInfo::ID) { - auto& rbi = dynamic_cast<ReadBucketInfo&>(cmd); - ReadBucketInfoReply::SP reply(new ReadBucketInfoReply(rbi)); - StorBucketDatabase::WrappedEntry entry( - bucketDatabase.get(rbi.getBucketId(), "fakelayer")); - if (!entry.exist()) { - fatal("Bucket " + rbi.getBucketId().toString() - + " did not exist in bucket database but we got " - + "read bucket info request for it."); - } else { - const BucketData* bucketData(getBucketData(0, rbi.getBucketId(), "readbucketinfo")); - if (bucketData != 0) { - entry->setBucketInfo(bucketData->info); - entry.write(); - } - } - if (!fatalError.empty()) { - reply->setResult(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, fatalError)); - } - sendUp(reply); - } else if (cmd.getType() == InternalBucketJoinCommand::ID) { - auto& ibj = dynamic_cast<InternalBucketJoinCommand&>(cmd); - InternalBucketJoinReply::SP reply( - new InternalBucketJoinReply(ibj)); - StorBucketDatabase::WrappedEntry entry( - bucketDatabase.get(ibj.getBucketId(), "fakelayer")); - if (!entry.exist()) { - fatal("Bucket " + ibj.getBucketId().toString() - + " did not exist in bucket database but we got " - + "read bucket info request for it."); - } else { - const BucketData* source(getBucketData( - ibj.getDiskOfInstanceToJoin(), ibj.getBucketId(), - "internaljoinsource")); - const BucketData* target(getBucketData( - ibj.getDiskOfInstanceToKeep(), ibj.getBucketId(), - "internaljointarget")); - if (source != 0 && target != 0) { - entry->setBucketInfo((*source + *target).info); - entry.write(); - } - } - if (!fatalError.empty()) { - reply->setResult(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, fatalError)); - } - sendUp(reply); - } else { - return false; - } - return true; - } - return false; - } -}; - -} // end of anonymous namespace - -void -InitializerTest::do_test_initialization(InitParams& params) -{ - std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params)); - - assert(params.diskCount == 1u); - TestServiceLayerApp node(params.nodeIndex, params.getConfig().getConfigId()); - DummyStorageLink top; - StorageBucketDBInitializer* initializer; - FakePersistenceLayer* bottom; - top.push_back(StorageLink::UP(initializer = new StorageBucketDBInitializer( - params.getConfig().getConfigId(), - node.getDoneInitializeHandler(), - node.getComponentRegister()))); - top.push_back(StorageLink::UP(bottom = new FakePersistenceLayer( - data, node.getStorageBucketDatabase()))); - - LOG(debug, "STARTING INITIALIZATION"); - top.open(); - - node.waitUntilInitialized(initializer); - - std::map<PartitionId, DiskData> initedBucketDatabase( - createMapFromBucketDatabase(node.getStorageBucketDatabase())); - verifyEqual(data, initedBucketDatabase); -} - -TEST_F(InitializerTest, bucket_progress_calculator) { - using document::BucketId; - StorageBucketDBInitializer::BucketProgressCalculator calc; - // We consider the given bucket as not being completed, so progress - // will be _up to_, not _including_ the bucket. This means we can never - // reach 1.0, so progress completion must be handled by other logic! - EXPECT_DOUBLE_EQ(0.0, calc.calculateProgress(BucketId(1, 0))); - EXPECT_DOUBLE_EQ(0.0, calc.calculateProgress(BucketId(32, 0))); - - EXPECT_DOUBLE_EQ(0.5, calc.calculateProgress(BucketId(1, 1))); - - EXPECT_DOUBLE_EQ(0.25, calc.calculateProgress(BucketId(2, 2))); - EXPECT_DOUBLE_EQ(0.5, calc.calculateProgress(BucketId(2, 1))); - EXPECT_DOUBLE_EQ(0.75, calc.calculateProgress(BucketId(2, 3))); - - EXPECT_DOUBLE_EQ(0.875, calc.calculateProgress(BucketId(3, 7))); -} - -struct DatabaseInsertCallback : MessageCallback -{ - DiskData& _data; - StorBucketDatabase& _database; - TestServiceLayerApp& _app; - const InitializerTest::InitParams& _params; - bool _invoked; - double _lastSeenProgress; - uint8_t _expectedReadBucketPriority; - std::ostringstream _errors; - DatabaseInsertCallback(DiskData& data, - StorBucketDatabase& db, - TestServiceLayerApp& app, - const InitializerTest::InitParams& params) - : _data(data), - _database(db), - _app(app), - _params(params), - _invoked(false), - _lastSeenProgress(0), - _expectedReadBucketPriority(255) - {} - - void onMessage(const api::StorageMessage& msg) override - { - // Always make sure we're not set as initialized while we're still - // processing messages! Also ensure progress never goes down. - lib::NodeState::CSP reportedState( - _app.getStateUpdater().getReportedNodeState()); - double progress(reportedState->getInitProgress().getValue()); - LOG(debug, "reported progress is now %g", progress); - if (progress >= 1.0) { - _errors << "progress exceeded 1.0: " << progress << "\n"; - } - if (progress < _lastSeenProgress) { - _errors << "progress went down! " - << _lastSeenProgress << " -> " << progress - << "\n"; - } - // 16 bits is allowed before we have listed any buckets at all - // since we at that point have no idea and have not reported anything - // back to the fleetcontroller. - if (_params.bucketBitsUsed != reportedState->getMinUsedBits() - && !(reportedState->getMinUsedBits() == 16 && !_invoked)) - { - _errors << "reported state contains wrong min used bits. " - << "expected " << _params.bucketBitsUsed - << ", but got " << reportedState->getMinUsedBits() - << "\n"; - } - _lastSeenProgress = progress; - if (_invoked) { - return; - } - - if (msg.getType() == api::MessageType::INTERNAL) { - auto& cmd = dynamic_cast<const api::InternalCommand&>(msg); - if (cmd.getType() == ReadBucketInfo::ID) { - if (cmd.getPriority() != _expectedReadBucketPriority) { - _errors << "expected ReadBucketInfo priority of " - << static_cast<int>(_expectedReadBucketPriority) - << ", was " << static_cast<int>(cmd.getPriority()); - } - // As soon as we get the first ReadBucketInfo, we insert new buckets - // into the the bucket database in order to simulate external - // load init. Kinda hacky, but should work as long as initializer - // always does at least 1 extra iteration pass (which we use - // config overrides to ensure happens). - _invoked = true; - for (int i = 0; i < 4; ++i) { - document::BucketId bid(16 + i, 8); // not the first, nor the last bucket - BucketData d; - StorBucketDatabase::WrappedEntry entry( - _database.get(bid, "DatabaseInsertCallback::onMessage", - StorBucketDatabase::CREATE_IF_NONEXISTING)); - if (entry.preExisted()) { - _errors << "db entry for " << bid << " already existed"; - } - if (i < 5) { - d.info = api::BucketInfo(3+i, 4+i, 5+i, 6+i, 7+i); - } - _data[bid] = d; - entry->setBucketInfo(d.info); - entry.write(); - } - } - } - } -}; - -TEST_F(InitializerTest, buckets_initialized_by_load) { - InitParams params; - params.docsPerDisk = 100; - params.diskCount = DiskCount(1); - params.getConfig().getConfig("stor-bucket-init").setValue("max_pending_info_reads_per_disk", 1); - params.getConfig().getConfig("stor-bucket-init").setValue("min_pending_info_reads_per_disk", 1); - params.getConfig().getConfig("stor-bucket-init") - .setValue("info_read_priority", 231); - - std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params)); - - assert(params.diskCount == 1u); - TestServiceLayerApp node(params.nodeIndex, - params.getConfig().getConfigId()); - DummyStorageLink top; - StorageBucketDBInitializer* initializer; - FakePersistenceLayer* bottom; - top.push_back(StorageLink::UP(initializer = new StorageBucketDBInitializer( - params.getConfig().getConfigId(), - node.getDoneInitializeHandler(), - node.getComponentRegister()))); - top.push_back(StorageLink::UP(bottom = new FakePersistenceLayer( - data, node.getStorageBucketDatabase()))); - - DatabaseInsertCallback callback(data[0], node.getStorageBucketDatabase(), - node, params); - callback._expectedReadBucketPriority = 231; - - bottom->messageCallback = &callback; - - top.open(); - - node.waitUntilInitialized(initializer); - // Must explicitly wait until initializer has closed to ensure node state - // has been set. - top.close(); - - ASSERT_TRUE(callback._invoked); - EXPECT_EQ(std::string(), callback._errors.str()); - - std::map<PartitionId, DiskData> initedBucketDatabase( - createMapFromBucketDatabase(node.getStorageBucketDatabase())); - verifyEqual(data, initedBucketDatabase); - - lib::NodeState::CSP reportedState( - node.getStateUpdater().getReportedNodeState()); - - double progress(reportedState->getInitProgress().getValue()); - EXPECT_GE(progress, 1.0); - EXPECT_LT(progress, 1.0001); - - EXPECT_EQ(params.bucketBitsUsed, reportedState->getMinUsedBits()); -} - -} // storage diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index edea33491af..a2a0cbd1e68 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "teststorageapp.h" -#include <vespa/storage/bucketdb/storagebucketdbinitializer.h> #include <vespa/storage/config/config-stor-server.h> #include <vespa/config-stor-distribution.h> #include <vespa/config-load-type.h> @@ -12,6 +11,7 @@ #include <vespa/config/config.h> #include <vespa/config/helper/configgetter.hpp> #include <thread> +#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".test.servicelayerapp"); @@ -116,7 +116,6 @@ TestStorageApp::waitUntilInitialized( << timeout << " seconds."; if (initializer != 0) { error << " "; - initializer->reportStatus(error, framework::HttpUrlPath("")); LOG(error, "%s", error.str().c_str()); throw std::runtime_error(error.str()); } diff --git a/storage/src/vespa/storage/bucketdb/CMakeLists.txt b/storage/src/vespa/storage/bucketdb/CMakeLists.txt index 048cc25ec95..5f491d80697 100644 --- a/storage/src/vespa/storage/bucketdb/CMakeLists.txt +++ b/storage/src/vespa/storage/bucketdb/CMakeLists.txt @@ -11,7 +11,6 @@ vespa_add_library(storage_bucketdb OBJECT generic_btree_bucket_database.cpp judyarray.cpp lockablemap.cpp - storagebucketdbinitializer.cpp storbucketdb.cpp DEPENDS ) diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp deleted file mode 100644 index 0761d845ce6..00000000000 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp +++ /dev/null @@ -1,844 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storagebucketdbinitializer.h" -#include "lockablemap.hpp" -#include "config-stor-bucket-init.h" -#include "storbucketdb.h" -#include <vespa/storage/common/nodestateupdater.h> -#include <vespa/storage/common/content_bucket_space_repo.h> -#include <vespa/storage/storageserver/storagemetricsset.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/vespalib/io/fileutil.h> -#include <vespa/config/config.h> -#include <vespa/vespalib/stllike/hash_map.hpp> -#include <vespa/config/helper/configgetter.hpp> -#include <iomanip> -#include <chrono> - -#include <vespa/log/bufferedlogger.h> -LOG_SETUP(".storage.bucketdb.initializer"); - -using document::BucketSpace; -using namespace std::chrono_literals; - -namespace storage { - -using BucketSet = vespalib::hash_set<document::BucketId, document::BucketId::hash>; - -struct BucketReadState { - using UP = std::unique_ptr<BucketReadState>; - - BucketSet _pending; - document::BucketId _databaseIterator; - bool _done; - - BucketReadState() : _done(false) {} -}; - -using vespa::config::content::core::StorBucketInitConfig; - -StorageBucketDBInitializer::Config::Config(const config::ConfigUri & configUri) - : _listPriority(0), - _infoReadPriority(255), - _minPendingInfoReadsPerDisk(16), - _maxPendingInfoReadsPerDisk(32) -{ - auto config = config::ConfigGetter<StorBucketInitConfig>::getConfig(configUri.getConfigId(), - configUri.getContext()); - _maxPendingInfoReadsPerDisk = config->maxPendingInfoReadsPerDisk; - _minPendingInfoReadsPerDisk = config->minPendingInfoReadsPerDisk; - _infoReadPriority = config->infoReadPriority; - _listPriority = config->listPriority; - if (config->completeListBeforeStartingRead) { - LOG(warning, "This config option is currently not honored. Info " - "reading will always start on a directory as soon as " - "it is done listing."); - } - LOG(debug, "Initializing bucket database: List priority %u, info priority " - "%u, min/max pending info per disk %u/%u.", - _listPriority, _infoReadPriority, - _minPendingInfoReadsPerDisk, _maxPendingInfoReadsPerDisk); -} - -StorageBucketDBInitializer::System::~System() = default; - -StorageBucketDBInitializer::System::System( - DoneInitializeHandler& doneInitializeHandler, - ServiceLayerComponentRegister& compReg, - const Config&) - : _doneInitializeHandler(doneInitializeHandler), - _component(compReg, "storagebucketdbinitializer"), - _bucketSpaceRepo(_component.getBucketSpaceRepo()), - _nodeIndex(_component.getIndex()), - _nodeState() -{ - // Is this correct? We should get the node state from the node state updater - // so it could work with disk capacities. Object is used to check for - // correct disk further down (in the case of internal join, deciding which - // should have it). Not that bad if wrong disk is picked though. - _nodeState.setDiskCount(1); -} - -StorBucketDatabase & -StorageBucketDBInitializer::System::getBucketDatabase(document::BucketSpace bucketSpace) const -{ - return _component.getBucketDatabase(bucketSpace); -} - -StorageBucketDBInitializer::Metrics::Metrics(framework::Component& component) - : metrics::MetricSet("dbinit", {}, - "Metrics for the storage bucket database initializer"), - _wrongDisk("wrongdisk", {}, - "Number of buckets found on non-ideal disk.", this), - _insertedCount("insertedcount", {}, - "Number of buckets inserted into database in list step.", this), - _joinedCount("joinedcount", {}, - "Number of buckets found in list step already found " - "(added from other disks).", this), - _infoReadCount("infocount", {}, - "Number of buckets we have read bucket information from.", this), - _infoSetByLoad("infosetbyload", {}, - "Number of buckets we did not need to request bucket info for " - "due to load already having updated them.", this), - _dirsListed("dirslisted", {}, - "Directories listed in list step of initialization.", this), - _startTime(component.getClock()), - _listLatency("listlatency", {}, - "Time used until list phase is done. (in ms)", this), - _initLatency("initlatency", {}, - "Time used until initialization is complete. (in ms)", this) -{ - component.registerMetric(*this); -} - -StorageBucketDBInitializer::Metrics::~Metrics() {} - -StorageBucketDBInitializer::GlobalState::GlobalState() - : _lists(), _joins(), _infoRequests(), _replies(), - _insertedCount(0), _infoReadCount(0), - _infoSetByLoad(0), _dirsListed(0), _dirsToList(0), - _gottenInitProgress(false), _doneListing(false), - _doneInitializing(false), _workerLock(), _workerCond(), _replyLock() -{ } -StorageBucketDBInitializer::GlobalState::~GlobalState() { } - -StorageBucketDBInitializer::StorageBucketDBInitializer( - const config::ConfigUri & configUri, - DoneInitializeHandler& doneInitializeHandler, - ServiceLayerComponentRegister& compReg) - : StorageLink("StorageBucketDBInitializer"), - framework::HtmlStatusReporter("dbinit", "Bucket database initializer"), - _config(configUri), - _system(doneInitializeHandler, compReg, _config), - _metrics(_system._component), - _state(), - _readState(1u) -{ - // Initialize read state for disks being available - for (uint32_t i=0; i< _readState.size(); ++i) { - _readState[i] = std::make_unique<BucketSpaceReadState>(); - for (const auto &elem : _system._bucketSpaceRepo) { - _readState[i]->emplace(elem.first, std::make_unique<BucketReadState>()); - _state._dirsToList += 1; - } - } - _system._component.registerStatusPage(*this); -} - -StorageBucketDBInitializer::~StorageBucketDBInitializer() -{ - if (_system._thread.get() != 0) { - LOG(error, "Deleted without calling close() first"); - onClose(); - } - closeNextLink(); -} - -void -StorageBucketDBInitializer::onOpen() -{ - // Trigger bucket database initialization - for (uint32_t i=0; i< _readState.size(); ++i) { - assert(_readState[i]); - const BucketSpaceReadState &spaceState = *_readState[i]; - for (const auto &stateElem : spaceState) { - document::BucketSpace bucketSpace = stateElem.first; - auto msg = std::make_shared<ReadBucketList>(bucketSpace, spi::PartitionId(i)); - _state._lists[msg->getMsgId()] = msg; - sendDown(msg); - } - } - _system._thread = _system._component.startThread(*this, 10ms, 1s); -} - -void -StorageBucketDBInitializer::onClose() -{ - if (_system._thread) { - _system._thread->interruptAndJoin(_state._workerCond); - _system._thread.reset(); - } -} - -void -StorageBucketDBInitializer::run(framework::ThreadHandle& thread) -{ - std::unique_lock<std::mutex> guard(_state._workerLock); - while (!thread.interrupted() && !_state._doneInitializing) { - std::list<api::StorageMessage::SP> replies; - { - std::lock_guard<std::mutex> replyGuard(_state._replyLock); - _state._replies.swap(replies); - } - for (api::StorageMessage::SP & msg : replies) { - api::InternalReply& reply(static_cast<api::InternalReply&>(*msg)); - if (reply.getType() == ReadBucketListReply::ID) { - handleReadBucketListReply(static_cast<ReadBucketListReply&>(reply)); - } else if (reply.getType() == ReadBucketInfoReply::ID) { - handleReadBucketInfoReply(static_cast<ReadBucketInfoReply&>(reply)); - } else if (reply.getType() == InternalBucketJoinReply::ID) { - handleInternalBucketJoinReply(static_cast<InternalBucketJoinReply&>(reply)); - } - } - if (_state._gottenInitProgress) { - _state._gottenInitProgress = false; - updateInitProgress(); - } - if (replies.empty()) { - _state._workerCond.wait_for(guard, 10ms); - thread.registerTick(framework::WAIT_CYCLE); - } else { - thread.registerTick(framework::PROCESS_CYCLE); - } - } -} - -void -StorageBucketDBInitializer::print( - std::ostream& out, bool verbose, const std::string& indent) const -{ - (void) verbose; (void) indent; - out << "StorageBucketDBInitializer()"; -} - -namespace { - -size_t -notDoneCount(const StorageBucketDBInitializer::ReadState &readState) -{ - size_t result = 0; - for (const auto &elem : readState) { - if (elem) { - for (const auto &stateElem : *elem) { - if (!stateElem.second->_done) { - ++result; - } - } - } - } - return result; -} - -} - -void -StorageBucketDBInitializer::reportHtmlStatus( - std::ostream& out, const framework::HttpUrlPath&) const -{ - std::lock_guard<std::mutex> guard(_state._workerLock); - out << "\n <h2>Config</h2>\n" - << " <table>\n" - << " <tr><td>Max pending info reads per disk</td><td>" - << _config._maxPendingInfoReadsPerDisk << "</td></tr>\n" - << " <tr><td>Min pending info reads per disk</td><td>" - << _config._minPendingInfoReadsPerDisk << "</td></tr>\n" - << " <tr><td>List priority</td><td>" - << _config._listPriority << "</td></tr>\n" - << " <tr><td>Info read priority</td><td>" - << _config._infoReadPriority << "</td></tr>\n" - << " </table>\n"; - - out << "\n <h2>Init progress</h2>\n"; - if (_state._doneListing) { - out << " Done listing.<br/>\n"; - } else { - out << " Listed " << _state._dirsListed << " of " - << _state._dirsToList << " partitions.<br/>\n"; - } - if (_state._lists.empty()) { - out << " No lists pending.<br/>\n"; - } else { - out << " " << _state._lists.size() << " lists pending.<br/>\n"; - } - if (_state._joins.empty()) { - out << " No internal joins pending.<br/>\n"; - } else { - out << " " << _state._joins.size() - << " internal joins pending.<br/>\n"; - } - if (_state._infoRequests.empty()) { - out << " No info requests pending.<br/>\n"; - } else { - out << " " << _state._infoRequests.size() - << " info requests pending.<br/>\n"; - } - uint32_t incompleteScan = notDoneCount(_readState); - if (incompleteScan == 0) { - out << " Done iterating bucket database to generate info " - << "requests.<br/>\n"; - } else { - out << " " << incompleteScan << " partitions still have buckets " - << "that needs bucket info.<br/>\n"; - } - out << " Init progress gotten after state update: " - << (_state._gottenInitProgress ? "true" : "false") << "<br/>\n"; - if (_state._doneInitializing) { - out << " Initialization complete.\n"; - } else { - out << " Initialization not completed.\n"; - } - - out << "\n <h2>Metrics</h2>\n"; - out << " " << _metrics._insertedCount.toString(true) << "<br/>\n" - << " " << _metrics._joinedCount.toString(true) << "<br/>\n" - << " " << _metrics._infoReadCount.toString(true) << "<br/>\n" - << " " << _metrics._infoSetByLoad.toString(true) << "<br/>\n" - << " " << _metrics._dirsListed.toString(true) << "<br/>\n" - << " Dirs to list " << _state._dirsToList << "<br/>\n"; - if (!_state._joins.empty()) { - out << "\n <h2>Pending internal bucket joins</h2>\n"; - for (const auto & e : _state._joins) { - out << " " << e.first << " - " << *e.second << "<br/>\n"; - } - } - out << "\n <h2>Info read state</h2>\n"; - std::map<Disk, uint32_t> pendingCounts; - for (const auto & e : _state._infoRequests) - { - ++pendingCounts[e.second]; - } - for (uint32_t i=0; i<_readState.size(); ++i) { - if (_readState[i].get() == 0) { - out << " <h3>Disk " << i << " is down</h3>\n"; - continue; - } - const BucketSpaceReadState& spaceState(*_readState[i]); - for (const auto &stateElem : spaceState) { - const BucketReadState &state = *stateElem.second; - out << " <h3>Disk " << i << ", bucket space " << stateElem.first.getId() << "</h3>\n"; - out << " Pending info requests: " << pendingCounts[i] << " ("; - if (state._pending.empty()) { - out << "none"; - } else { - bool first = true; - for (BucketSet::const_iterator it = state._pending.begin(); - it != state._pending.end(); ++it) { - if (!first) { - out << ", "; - } else { - first = false; - } - out << *it; - } - } - out << ")<br/>\n"; - out << " Bucket database iterator: " << state._databaseIterator - << "<br/>\n"; - out << " Done iterating bucket database. " - << (state._done ? "true" : "false") << "<br/>\n"; - } - } - for (std::map<Disk, uint32_t>::iterator it = pendingCounts.begin(); - it != pendingCounts.end(); ++it) - { - out << " Disk " << it->first << ": " << it->second << "<br/>\n"; - } -} - -// Always called from worker thread. Worker monitor already grabbed -void -StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket, - const lib::Distribution &distribution, - spi::PartitionId partition, - api::BucketInfo bucketInfo) -{ - document::BucketId bucketId(bucket.getBucketId()); - StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(bucket.getBucketSpace()).get( - bucketId, "StorageBucketDBInitializer::registerBucket", - StorBucketDatabase::CREATE_IF_NONEXISTING)); - if (bucketInfo.valid()) { - if (entry.preExisted()) { - LOG(debug, "Had value %s for %s before registering", - entry->getBucketInfo().toString().c_str(), - bucketId.toString().c_str()); - } - LOG(debug, "Got new value %s from %s partition %u", - bucketInfo.toString().c_str(), bucketId.toString().c_str(), - partition.getValue()); - entry->setBucketInfo(bucketInfo); - } else { - LOG(debug, "Got invalid bucket info from %s partition %u: %s", - bucketId.toString().c_str(), partition.getValue(), - bucketInfo.toString().c_str()); - } - if (entry.preExisted()) { - if (0 == partition) { - LOG(debug, "%s already existed in bucket database on disk %i. " - "Might have been moved from wrong directory prior to " - "listing this directory.", - bucketId.toString().c_str(), int(partition)); - return; - } - uint32_t keepOnDisk, joinFromDisk; - if (distribution.getPreferredAvailableDisk( - _system._nodeState, _system._nodeIndex, - bucketId.stripUnused()) == partition) - { - keepOnDisk = partition; - joinFromDisk = 0; - } else { - keepOnDisk = 0; - joinFromDisk = partition; - } - LOG(debug, "%s exist on both disk 0 and disk %i. Joining two versions " - "onto disk %u.", - bucketId.toString().c_str(), int(partition), keepOnDisk); - entry.unlock(); - // Must not have bucket db lock while sending down - auto cmd = std::make_shared<InternalBucketJoinCommand>(bucket, keepOnDisk, joinFromDisk); - { - _state._joins[cmd->getMsgId()] = cmd; - } - sendDown(cmd); - } else { - _system._component.getMinUsedBitsTracker().update(bucketId); - LOG(spam, "Inserted %s on disk %i into bucket database", - bucketId.toString().c_str(), int(partition)); - entry.write(); - uint16_t disk(distribution.getIdealDisk( - _system._nodeState, _system._nodeIndex, bucketId.stripUnused(), - lib::Distribution::IDEAL_DISK_EVEN_IF_DOWN)); - if (disk != partition) { - _metrics._wrongDisk.inc(); - } - - _metrics._insertedCount.inc(); - ++_state._insertedCount; - } -} - -namespace { - struct NextBucketOnDiskFinder { - typedef document::BucketId BucketId; - - uint16_t _disk; - BucketId& _iterator; - uint16_t _count; - std::vector<BucketId> _next; - uint32_t _alreadySet; - - NextBucketOnDiskFinder(uint16_t disk, BucketId& iterator, - uint16_t maxToFind) - : _disk(disk), _iterator(iterator), _count(maxToFind), - _next(), _alreadySet(0) {} - - StorBucketDatabase::Decision operator()( - uint64_t revBucket, const StorBucketDatabase::Entry& entry) - { - BucketId bucket(BucketId::keyToBucketId(revBucket)); - if (bucket == _iterator) { - //LOG(spam, "Ignoring bucket %s as it has value of current " - // "iterator", bucket.toString().c_str()); - return StorBucketDatabase::Decision::CONTINUE; - } - _iterator = bucket; - if (0 != _disk) { - //LOG(spam, "Ignoring bucket %s as it is not on disk currently " - // "being processed", bucket.toString().c_str()); - // Ignore. We only want to scan for one disk - } else if (entry.valid()) { - LOG(spam, "%s already initialized by load %s. " - "Not requesting info", - bucket.toString().c_str(), - entry.getBucketInfo().toString().c_str()); - ++_alreadySet; - } else { - _next.push_back(_iterator); - if (_next.size() >= _count) { - LOG(spam, "Aborting iterating for disk %u as we have " - "enough results. Leaving iterator at %s", - uint32_t(_disk), _iterator.toString().c_str()); - return StorBucketDatabase::Decision::ABORT; - } - } - return StorBucketDatabase::Decision::CONTINUE; - } - }; -} - -// Always called from worker thread. It holds worker monitor. -void -StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk, document::BucketSpace bucketSpace) -{ - auto itr = _readState[disk]->find(bucketSpace); - assert(itr != _readState[disk]->end()); - BucketReadState& state = *itr->second; - if (state._done - || state._pending.size() >= _config._maxPendingInfoReadsPerDisk) - { - LOG(spam, "No need to iterate further. Database has completed " - "iterating buckets for disk %u.", uint32_t(disk)); - return; - } - uint32_t count(_config._maxPendingInfoReadsPerDisk - state._pending.size()); - NextBucketOnDiskFinder finder(disk, state._databaseIterator, count); - LOG(spam, "Iterating bucket db further. Starting at iterator %s", - state._databaseIterator.toString().c_str()); - _system.getBucketDatabase(bucketSpace).for_each( - std::ref(finder), - "StorageBucketDBInitializer::readBucketInfo", - state._databaseIterator.stripUnused().toKey()); - if (finder._alreadySet > 0) { - _metrics._infoSetByLoad.inc(finder._alreadySet); - _state._infoSetByLoad += finder._alreadySet; - } - for (uint32_t i=0; i<finder._next.size(); ++i) { - document::Bucket bucket(bucketSpace, finder._next[i]); - auto cmd = std::make_shared<ReadBucketInfo>(bucket); - cmd->setPriority(_config._infoReadPriority); - state._pending.insert(finder._next[i]); - _state._infoRequests[cmd->getMsgId()] = disk; - LOG(spam, "Requesting bucket info from %s on disk %u.", - finder._next[i].toString().c_str(), uint32_t(disk)); - sendDown(cmd); - } - state._done |= finder._next.empty(); - _state._gottenInitProgress = true; - checkIfDone(); -} - -bool -StorageBucketDBInitializer::onDown( - const std::shared_ptr<api::StorageMessage>& msg) -{ - // If we're done listing, load can go as normal. - // Rationale behind memory_order_relaxed: _doneListing is initially false - // and is ever only written once. Since the behavior for temporarily - // reading a stale default is safe (block the message) and we do not - // access any other shared state dependent on _doneListing, relaxed - // semantics should be fine here. - if (_state._doneListing.load(std::memory_order_relaxed)) { - return StorageLink::onDown(msg); - } - - // If we're not done listing, block most types of load - - // There are no known replies, but if there are to come any, they should - // likely not be blocked. - if (msg->getType().isReply()) return false; - - switch (msg->getType().getId()) { - // Don't want to block communication with state manager - case api::MessageType::SETSYSTEMSTATE_ID: - case api::MessageType::GETNODESTATE_ID: - return StorageLink::onDown(msg); - default: - break; - } - // Fail everything else - std::ostringstream ost; - ost << "Cannot perform operation " << msg->getType() << " now because " - << "we are still listing buckets from disk."; - LOGBP(warning, "%s", ost.str().c_str()); - std::unique_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*msg).makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, ost.str())); - sendUp(std::shared_ptr<api::StorageReply>(reply.release())); - return true; -} - -// Called from disk threads. Just push replies to reply list so worker thread -// can handle it. This minimizes locking needed. Disk reads should be the -// limiting factor, so don't need to update initializer state in multiple -// threads. -bool -StorageBucketDBInitializer::onInternalReply( - const std::shared_ptr<api::InternalReply>& reply) -{ - switch(reply->getType()) { - case ReadBucketListReply::ID: - case ReadBucketInfoReply::ID: - case InternalBucketJoinReply::ID: - { - std::lock_guard<std::mutex> guard(_state._replyLock); - _state._replies.push_back(reply); - return true; - } - default: - return false; - } -} - -// Always called from worker thread. It holds worker monitor. -void -StorageBucketDBInitializer::handleReadBucketListReply( - ReadBucketListReply& reply) -{ - vespalib::hash_map<api::StorageMessage::Id, - ReadBucketList::SP>::iterator it( - _state._lists.find(reply.getMsgId())); - if (it == _state._lists.end()) { - LOGBP(warning, "Got bucket list reply for partition %u, request " - "%" PRIu64 ", that was not registered pending.", - reply.getPartition().getValue(), reply.getMsgId()); - } else { - _state._lists.erase(it); - } - // We don't handle failed bucket listings. Kill process. Bucket lists are - // essential for storage node operations - if (reply.getResult().failed()) { - LOG(debug, "Got failing bucket list reply. Requesting shutdown"); - _system._component.requestShutdown( - "Failed to list buckets. Cannot run without bucket list: " - + reply.getResult().toString()); - return; - } - _metrics._dirsListed.inc(); - _state._gottenInitProgress = true; - const spi::BucketIdListResult::List& list(reply.getBuckets()); - api::BucketInfo info; - assert(!info.valid()); - const auto &contentBucketSpace(_system._bucketSpaceRepo.get(reply.getBucketSpace())); - auto distribution(contentBucketSpace.getDistribution()); - for (uint32_t i=0, n=list.size(); i<n; ++i) { - registerBucket(document::Bucket(reply.getBucketSpace(), list[i]), *distribution, reply.getPartition(), info); - } - if (++_state._dirsListed == _state._dirsToList) { - handleListingCompleted(); - } - checkIfDone(); - sendReadBucketInfo(reply.getPartition(), reply.getBucketSpace()); -} - -// Always called from worker thread. It holds worker monitor. -void -StorageBucketDBInitializer::handleReadBucketInfoReply( - ReadBucketInfoReply& reply) -{ - document::BucketSpace bucketSpace = reply.getBucket().getBucketSpace(); - if (reply.getResult().failed()) { - LOGBP(warning, "Deleting %s from bucket database. Cannot use it as we " - "failed to read bucket info for it: %s", - reply.getBucketId().toString().c_str(), - reply.getResult().toString().c_str()); - _system.getBucketDatabase(bucketSpace).erase(reply.getBucketId(), - "dbinit.failedreply"); - } - _metrics._infoReadCount.inc(); - ++_state._infoReadCount; - _state._gottenInitProgress = true; - vespalib::hash_map<api::StorageMessage::Id, Disk>::iterator it( - _state._infoRequests.find(reply.getMsgId())); - if (it == _state._infoRequests.end()) { - LOGBP(warning, "Got bucket info reply for %s, request %" PRIu64 ", that " - "was not registered pending.", - reply.getBucketId().toString().c_str(), reply.getMsgId()); - checkIfDone(); - } else { - uint32_t disk(it->second); - _state._infoRequests.erase(it->first); - auto itr = _readState[disk]->find(bucketSpace); - assert(itr != _readState[disk]->end()); - BucketReadState& state = *itr->second; - BucketSet::iterator it2(state._pending.find(reply.getBucketId())); - if (it2 == state._pending.end()) { - LOGBP(warning, "Got bucket info reply for %s that was registered " - "in global state but not in disk %u's state.", - reply.getBucketId().toString().c_str(), disk); - } else { - state._pending.erase(reply.getBucketId()); - LOG(spam, "Got info reply for %s: %s", - reply.getBucketId().toString().c_str(), - _system.getBucketDatabase(reply.getBucket().getBucketSpace()).get( - reply.getBucketId(), "dbinit.inforeply") - ->getBucketInfo().toString().c_str()); - } - checkIfDone(); - sendReadBucketInfo(spi::PartitionId(disk), bucketSpace); - } -} - -// Always called from worker thread. It holds worker monitor. -void -StorageBucketDBInitializer::handleInternalBucketJoinReply( - InternalBucketJoinReply& reply) -{ - _metrics._joinedCount.inc(); - vespalib::hash_map<api::StorageMessage::Id, - InternalBucketJoinCommand::SP>::iterator it( - _state._joins.find(reply.getMsgId())); - if (reply.getResult().failed()) { - LOGBP(warning, "Failed to join multiple copies of %s. One of the " - "versions will not be available: %s", - reply.getBucketId().toString().c_str(), - reply.getResult().toString().c_str()); - } - if (it != _state._joins.end()) { - _state._joins.erase(reply.getMsgId()); - LOG(debug, "Completed internal bucket join for %s. Got bucket info %s", - reply.getBucketId().toString().c_str(), - reply.getBucketInfo().toString().c_str()); - StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(reply.getBucket().getBucketSpace()).get( - reply.getBucketId(), - "StorageBucketDBInitializer::onInternalBucketJoinReply")); - entry->setBucketInfo(reply.getBucketInfo()); - entry.write(); - } else { - LOGBP(warning, "Got internal join reply for %s which was not " - "registered to be pending.", - reply.getBucketId().toString().c_str()); - } - checkIfDone(); -} - -namespace { - -bool -isDone(const StorageBucketDBInitializer::ReadState &readState) -{ - return notDoneCount(readState) == 0; -} - -} - -// Always called from worker thread. It holds worker monitor. -void -StorageBucketDBInitializer::checkIfDone() -{ - if (_state._dirsListed < _state._dirsToList) return; - if (!_state._infoRequests.empty()) return; - if (!_state._joins.empty()) return; - if (!isDone(_readState)) { - return; - } - _state._doneInitializing = true; - _system._doneInitializeHandler.notifyDoneInitializing(); - _metrics._initLatency.addValue(_metrics._startTime.getElapsedTimeAsDouble()); - LOG(debug, "Completed initializing"); -} - -double -StorageBucketDBInitializer::calculateMinProgressFromDiskIterators() const -{ - double minProgress = 1.0; - for (size_t disk = 0; disk < _readState.size(); ++disk) { - if (_readState[disk].get() == 0) { - continue; - } - for (const auto &stateElem : *_readState[disk]) { - const BucketReadState &state = *stateElem.second; - document::BucketId bid(state._databaseIterator); - - double progress; - if (!state._done) { - progress = BucketProgressCalculator::calculateProgress(bid); - } else { - progress = 1.0; - } - - minProgress = std::min(minProgress, progress); - } - } - //std::cerr << "minProgress: " << minProgress << "\n"; - return minProgress; -} - -// Always called from worker thread. It holds worker monitor. -double -StorageBucketDBInitializer::calcInitProgress() const -{ - double listProgress(_state._dirsToList == 0 - ? 0 : _state._dirsListed / _state._dirsToList); - // Do sanity check - if (_state._dirsListed > _state._dirsToList) { - LOG(error, "%" PRIu64 " of %u dirs are reported listed. This is a bug.", - _state._dirsListed, _state._dirsToList); - listProgress = 1.0; - } - double infoProgress(calculateMinProgressFromDiskIterators()); - if (_state._dirsToList > _state._dirsListed - && infoProgress > 0) - { - LOG(debug, "Not done with list step yet. (%" PRIu64 " of %u done). " - "Need to nullify info part of progress so fleetcontroller " - "doesn't think listing is completed.", - _state._dirsListed, _state._dirsToList); - infoProgress = 0; - - // Currently we never honor complete_list_before_starting_read option. - // We might want to do that later, in order to be able to enforce - // waiting to read. For instance, if we have usecase where several - // directories map to the same disk, such that reading info is slowing - // down directory listing to such an extent that quick restart aint - // quick enough anymore. If we do, revert to make this an error if that - // config option is enabled - } - - double listLimit = lib::NodeState::getListingBucketsInitProgressLimit(); - double progress(listLimit * listProgress - + (1.0 - listLimit) * infoProgress); - assert(progress < 1.000000001); - return progress; -} - -// Always called from worker thread. It holds worker monitor. -void -StorageBucketDBInitializer::updateInitProgress() const -{ - double progress = calcInitProgress(); - NodeStateUpdater::Lock::SP lock( - _system._component.getStateUpdater().grabStateChangeLock()); - lib::NodeState ns( - *_system._component.getStateUpdater().getReportedNodeState()); - LOG(debug, "Reporting node init progress as %g", progress); - ns.setInitProgress(progress); - ns.setMinUsedBits(_system._component.getMinUsedBitsTracker() - .getMinUsedBits()); - _system._component.getStateUpdater().setReportedNodeState(ns); -} - -// Always called from worker thread. It holds worker monitor. -void -StorageBucketDBInitializer::handleListingCompleted() -{ - assert(!_state._doneListing); - _state._doneListing = true; - if (_state._dirsToList != _state._dirsListed) { - LOG(warning, "After list phase completed, counters indicate we've " - "listed %" PRIu64 " of %u directories. This is a bug.", - _state._dirsListed, _state._dirsToList); - } - LOG(info, "Completed listing buckets from disk. Minimum used bits is %u", - _system._component.getMinUsedBitsTracker().getMinUsedBits()); - _metrics._listLatency.addValue(_metrics._startTime.getElapsedTimeAsDouble()); -} - -double -StorageBucketDBInitializer:: -BucketProgressCalculator::calculateProgress(const document::BucketId& bid) -{ - uint64_t revBucket(document::BucketId::bucketIdToKey(bid.getId())); - - // Remove unused bits - uint64_t progressBits(revBucket >> (64 - bid.getUsedBits())); -/* - std::cerr << bid << ":\n"; - std::cerr << "revBucket: " << std::hex << revBucket << ", progressBits: " << progressBits - << ", divisor: " << (1ULL << bid.getUsedBits()) - << ", result= " << (static_cast<double>(progressBits) / (1ULL << bid.getUsedBits())) - << "\n"; -*/ - return static_cast<double>(progressBits) / (1ULL << bid.getUsedBits()); -} - -} // storage diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h deleted file mode 100644 index 00d39965151..00000000000 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::StorageBucketDBInitializer - * \ingroup bucketdb - * - * \brief Initializes the bucket database on the storage node. - * - * The storage bucket DB is responsible for initializing the bucket database on - * the storage node. This used to be the task of the bucket manager, but to - * make the implementation cleaner, the logic for this has been separated. - * - * This works as follows: - * - * 1. When component is started (onOpen), partition states should already have - * been aquired from the SPI and made available to this class. Requests for - * listing buckets will be sent to all partitions. Background thread will be - * started to avoid doing processes in thread sending replies. - * - * 2. Upon receiving bucket lists into background thread, the bucket database - * will be populated with buckets. Bucket information may at this point be - * invalid or not, depending on persistence provider. Providers that can list - * cheaply but where getting info is more expensive, will likely want to return - * invalid entries as the node can start handling load as fast as bucket lists - * is known. Providers who gets info and bucket lists equally cheap will likely - * prefer to give info at once to avoid the read step. - * - * 3. Upon receiving the last bucket list, the background thread will be started - * to do remaining work. - * - * 4. Background thread will iterate through the bucket database, issuing - * bucket info requests for all buckets that have invalid bucket info. Once the - * whole bucket database has been iterated and there are no longer pending - * operations, initialization is complete, and node will be tagged in up state. - */ - -#pragma once - -#include <atomic> -#include <vespa/vespalib/util/document_runnable.h> -#include <vespa/metrics/metrics.h> -#include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/storage/bucketdb/minimumusedbitstracker.h> -#include <vespa/storage/common/bucketmessages.h> -#include <vespa/storage/common/doneinitializehandler.h> -#include <vespa/storage/common/servicelayercomponent.h> -#include <vespa/storage/common/storagelink.h> -#include <vespa/storageframework/generic/status/htmlstatusreporter.h> -#include <vespa/storageframework/generic/clock/timer.h> -#include <vespa/vespalib/stllike/hash_map.h> -#include <vespa/vdslib/state/nodestate.h> -#include <vespa/config/subscription/configuri.h> -#include <list> -#include <unordered_map> -#include <mutex> -#include <condition_variable> - -namespace storage { - -struct BucketReadState; - -class StorageBucketDBInitializer : public StorageLink, - public framework::HtmlStatusReporter, - private framework::Runnable -{ - typedef uint16_t Disk; - typedef vespalib::hash_map<api::StorageMessage::Id, Disk> IdDiskMap; - - struct Config { - // List priority should be larger than info priority. - uint16_t _listPriority; - uint16_t _infoReadPriority; - // When going below this amount of pending, send more until we reach max - uint16_t _minPendingInfoReadsPerDisk; - uint16_t _maxPendingInfoReadsPerDisk; - - Config(const config::ConfigUri & configUri); - }; - struct System { - DoneInitializeHandler& _doneInitializeHandler; - ServiceLayerComponent _component; - const ContentBucketSpaceRepo& _bucketSpaceRepo; - uint32_t _nodeIndex; - lib::NodeState _nodeState; // Disk info for ideal state calculations - framework::Thread::UP _thread; - - System(DoneInitializeHandler& doneInitializeHandler, - ServiceLayerComponentRegister&, - const Config&); - ~System(); - - StorBucketDatabase &getBucketDatabase(document::BucketSpace bucketSpace) const; - }; - struct Metrics : public metrics::MetricSet { - metrics::LongCountMetric _wrongDisk; - metrics::LongCountMetric _insertedCount; - metrics::LongCountMetric _joinedCount; - metrics::LongCountMetric _infoReadCount; - metrics::LongCountMetric _infoSetByLoad; - metrics::LongCountMetric _dirsListed; - framework::MilliSecTimer _startTime; - metrics::LongAverageMetric _listLatency; - metrics::LongAverageMetric _initLatency; - - Metrics(framework::Component&); - ~Metrics(); - }; - struct GlobalState { - vespalib::hash_map<api::StorageMessage::Id, ReadBucketList::SP> _lists; - vespalib::hash_map<api::StorageMessage::Id, InternalBucketJoinCommand::SP> _joins; - IdDiskMap _infoRequests; - std::list<api::StorageMessage::SP> _replies; - uint64_t _insertedCount; - uint64_t _infoReadCount; - uint64_t _infoSetByLoad; - uint64_t _dirsListed; - uint32_t _dirsToList; - bool _gottenInitProgress; - std::atomic<bool> _doneListing; - bool _doneInitializing; - // This lock is held while the worker thread is working, such that - // status retrieval can lock it. Listing part only grabs it when - // needed to supporting listing in multiple threads - mutable std::mutex _workerLock; - std::condition_variable _workerCond; - // This lock protects the reply list. - std::mutex _replyLock; - - GlobalState(); - ~GlobalState(); - }; - -public: - using BucketSpaceReadState = std::unordered_map<document::BucketSpace, - std::unique_ptr<BucketReadState>, document::BucketSpace::hash>; - using ReadState = std::vector<std::unique_ptr<BucketSpaceReadState>>; - -private: - Config _config; - System _system; - Metrics _metrics; - GlobalState _state; - ReadState _readState; - -public: - StorageBucketDBInitializer(const config::ConfigUri&, - DoneInitializeHandler&, - ServiceLayerComponentRegister&); - ~StorageBucketDBInitializer(); - - void print(std::ostream& out, bool verbose, const std::string& indent) const override; - - void onOpen() override; - void onClose() override; - - void run(framework::ThreadHandle&) override; - - bool onDown(const std::shared_ptr<api::StorageMessage>&) override; - bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override; - - void handleReadBucketListReply(ReadBucketListReply&); - void handleReadBucketInfoReply(ReadBucketInfoReply&); - void handleInternalBucketJoinReply(InternalBucketJoinReply&); - - /** Status implementation. */ - void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; - - // The below functions should only be called by the class itself, but they - // are left public for easability of access for unit tests and anonymous - // classes defined in implementation. - - /** Get the path of a given directory. */ - std::string getPathName(std::vector<uint32_t>& path, - const document::BucketId* = 0) const; - /** Process a given file found through listing files on disk */ - bool processFile(std::vector<uint32_t>& path, const std::string& pathName, - const std::string& name); - /** - * Find what bucket identifier file corresponds to. - * Invalid bucket indicates none. (Invalid file name) - */ - document::BucketId extractBucketId(const std::string& name) const; - /** - * Handle that the bucket might have been found in the wrong position. - * Returns true if we should attepmt to register the bucket. - */ - bool handleBadLocation(const document::BucketId&, - std::vector<uint32_t>& path); - /** Register a bucket in the bucket database. */ - void registerBucket(const document::Bucket &bucket, - const lib::Distribution &distribution, - spi::PartitionId, - api::BucketInfo bucketInfo); - /** - * Sends more read bucket info to a given disk. Lock must already be taken. - * Will be released by function prior to sending messages down. - */ - void sendReadBucketInfo(spi::PartitionId, document::BucketSpace bucketSpace); - /** Check whether initialization is complete. Should hold lock to call it.*/ - void checkIfDone(); - - /** Calculate minimum progress from all disks' bucket db iterators */ - double calculateMinProgressFromDiskIterators() const; - /** Calculate how far we have progressed initializing. */ - double calcInitProgress() const; - /** Update node state if init progress have changed enough. */ - void updateInitProgress() const; - /** Handle that we're done listing buckets. */ - void handleListingCompleted(); - - /** Used for unit tests to see that stuff has happened. */ - virtual const Metrics& getMetrics() const { return _metrics; } - - - class BucketProgressCalculator - { - public: - /** - * Estimate progress into the total bucket space. - * Done by taking reverse bucket key, shifting away unused bits and - * dividing the result by 2**used bits to get approximate progress. - * @param bid Current bucket space iterator/cursor. - */ - static double calculateProgress(const document::BucketId& bid); - }; -}; - -} // storage diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 6c48add6f53..e7ce6737ac8 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -13,7 +13,6 @@ #include <vespa/storage/visiting/messagebusvisitormessagesession.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/storage/bucketdb/storagebucketdbinitializer.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/persistence/spi/exceptions.h> |