summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-10-16 11:44:48 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-10-19 11:59:35 +0000
commit6a46ee89f4307e151dc830fb3624d4cdf3272ad0 (patch)
tree92d4af35a1d6d330d46d274b8867973b88f58424
parentfb6776f5fda4522a1bf210713e8ca574b6d199dd (diff)
Remove legacy bucket DB initializer component
-rw-r--r--storage/src/tests/bucketdb/CMakeLists.txt1
-rw-r--r--storage/src/tests/bucketdb/initializertest.cpp596
-rw-r--r--storage/src/tests/common/teststorageapp.cpp3
-rw-r--r--storage/src/vespa/storage/bucketdb/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp844
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h227
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp1
7 files changed, 1 insertions, 1672 deletions
diff --git a/storage/src/tests/bucketdb/CMakeLists.txt b/storage/src/tests/bucketdb/CMakeLists.txt
index a182fed02ea..2acaea1940d 100644
--- a/storage/src/tests/bucketdb/CMakeLists.txt
+++ b/storage/src/tests/bucketdb/CMakeLists.txt
@@ -5,7 +5,6 @@ vespa_add_executable(storage_bucketdb_gtest_runner_app TEST
bucketinfotest.cpp
bucketmanagertest.cpp
gtest_runner.cpp
- initializertest.cpp
judyarraytest.cpp
judymultimaptest.cpp
lockablemaptest.cpp
diff --git a/storage/src/tests/bucketdb/initializertest.cpp b/storage/src/tests/bucketdb/initializertest.cpp
deleted file mode 100644
index a39be7910f1..00000000000
--- a/storage/src/tests/bucketdb/initializertest.cpp
+++ /dev/null
@@ -1,596 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * Tests storage initialization without depending on persistence layer.
- */
-#include <vespa/document/base/testdocman.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/storage/bucketdb/lockablemap.hpp>
-#include <vespa/storage/bucketdb/storagebucketdbinitializer.h>
-#include <vespa/storage/persistence/filestorage/filestormanager.h>
-#include <vespa/storageapi/message/bucket.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storageapi/message/state.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/dummystoragelink.h>
-#include <tests/common/testhelper.h>
-#include <vespa/vdstestlib/config/dirconfig.hpp>
-#include <vespa/vespalib/gtest/gtest.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".test.bucketdb.initializing");
-
-using document::FixedBucketSpaces;
-using namespace ::testing;
-
-namespace storage {
-
-typedef uint16_t PartitionId;
-
-struct InitializerTest : public Test {
-
- class InitParams {
- vdstestlib::DirConfig config;
- bool configFinalized;
-
- public:
- uint32_t bucketBitsUsed;
- NodeIndex nodeIndex;
- NodeCount nodeCount;
- Redundancy redundancy;
- uint32_t docsPerDisk;
- DiskCount diskCount;
- bool bucketWrongDisk;
- bool bucketMultipleDisks;
- bool failingListRequest;
- bool failingInfoRequest;
-
- InitParams()
- : config(getStandardConfig(true)),
- configFinalized(false),
- bucketBitsUsed(4),
- nodeIndex(0),
- nodeCount(10),
- redundancy(2),
- docsPerDisk(10),
- diskCount(1),
- bucketWrongDisk(false),
- bucketMultipleDisks(false),
- failingListRequest(false),
- failingInfoRequest(false)
- {}
-
- vdstestlib::DirConfig& getConfig() {
- if (!configFinalized) {
- config.getConfig("stor-server")
- .setValue("node_index", nodeIndex);
- config.getConfig("stor-distribution")
- .setValue("redundancy", redundancy);
- configFinalized = true;
- }
- return config;
- }
-
- };
-
- document::TestDocMan _docMan;
-
- void do_test_initialization(InitParams& params);
-};
-
-TEST_F(InitializerTest, init_with_empty_node) {
- InitParams params;
- params.docsPerDisk = 0;
- do_test_initialization(params);
-}
-
-TEST_F(InitializerTest, init_with_data_on_single_disk) {
- InitParams params;
- params.diskCount = DiskCount(1);
- do_test_initialization(params);
-}
-
-TEST_F(InitializerTest, init_with_multiple_disks) {
- InitParams params;
- do_test_initialization(params);
-}
-
-TEST_F(InitializerTest, init_with_bucket_on_wrong_disk) {
- InitParams params;
- params.bucketWrongDisk = true;
- params.bucketBitsUsed = 58;
- do_test_initialization(params);
-}
-
-namespace {
-// Data kept on buckets we're using in test.
-struct BucketData {
- api::BucketInfo info;
-
- BucketData() : info(0, 0, 0, 0, 0) {
- }
-
- BucketData operator+(const BucketData& other) const {
- BucketData copy;
- copy.info.setDocumentCount(
- info.getDocumentCount() + other.info.getDocumentCount());
- copy.info.setTotalDocumentSize(
- info.getTotalDocumentSize()
- + other.info.getTotalDocumentSize());
- copy.info.setChecksum(
- info.getChecksum() * other.info.getChecksum());
- return copy;
- }
-};
-// Data residing on one disk
-typedef std::map<document::BucketId, BucketData> DiskData;
-struct BucketInfoLogger {
- std::map<PartitionId, DiskData>& map;
-
- explicit BucketInfoLogger(std::map<PartitionId, DiskData>& m)
- : map(m) {}
-
- StorBucketDatabase::Decision operator()(
- uint64_t revBucket, const StorBucketDatabase::Entry& entry)
- {
- document::BucketId bucket(document::BucketId::keyToBucketId(revBucket));
- assert(bucket.getRawId() != 0);
- assert(entry.getBucketInfo().valid());
- DiskData& ddata(map[0]);
- BucketData& bdata(ddata[bucket]);
- bdata.info = entry.getBucketInfo();
- return StorBucketDatabase::Decision::CONTINUE;
- }
-};
-std::map<PartitionId, DiskData>
-createMapFromBucketDatabase(StorBucketDatabase& db) {
- std::map<PartitionId, DiskData> result;
- BucketInfoLogger infoLogger(result);
- db.acquire_read_guard()->for_each(std::ref(infoLogger));
- return result;
-}
-// Create data we want to have in this test
-std::map<PartitionId, DiskData>
-buildBucketInfo(const document::TestDocMan& docMan,
- InitializerTest::InitParams& params)
-{
- std::map<PartitionId, DiskData> result;
- for (uint32_t i=0; i<params.diskCount; ++i) {
- result[i];
- }
- lib::Distribution distribution(
- lib::Distribution::getDefaultDistributionConfig(
- params.redundancy, params.nodeCount));
- document::BucketIdFactory bucketIdFactory;
- lib::NodeState nodeState;
- nodeState.setDiskCount(params.diskCount);
-
- uint64_t totalDocs = params.docsPerDisk * params.diskCount;
- for (uint32_t i=0, n=totalDocs; i<n; ++i) {
- bool useWrongDisk = false;
- if (i == 1 && params.bucketWrongDisk) {
- useWrongDisk = true;
- }
- document::Document::SP doc(docMan.createRandomDocument(i));
- if (i == 3 && params.bucketMultipleDisks) {
- doc = docMan.createRandomDocument(i - 1);
- useWrongDisk = true;
- }
- document::BucketId bid(bucketIdFactory.getBucketId(doc->getId()));
- bid.setUsedBits(params.bucketBitsUsed);
- bid = bid.stripUnused();
- uint32_t partition(distribution.getIdealDisk(
- nodeState, params.nodeIndex, bid,
- lib::Distribution::IDEAL_DISK_EVEN_IF_DOWN));
- if (useWrongDisk) {
- int correctPart = partition;
- partition = (partition + 1) % params.diskCount;;
- LOG(debug, "Putting bucket %s on wrong disk %u instead of %u",
- bid.toString().c_str(), partition, correctPart);
- }
- LOG(debug, "Putting bucket %s on disk %u",
- bid.toString().c_str(), partition);
- BucketData& data(result[partition][bid]);
- data.info.setDocumentCount(data.info.getDocumentCount() + 1);
- data.info.setTotalDocumentSize(
- data.info.getTotalDocumentSize() + 100);
- data.info.setChecksum(data.info.getChecksum() * 3);
- }
- return result;
-}
-void verifyEqual(std::map<PartitionId, DiskData>& org,
- std::map<PartitionId, DiskData>& existing)
-{
- uint32_t equalCount = 0;
- std::map<PartitionId, DiskData>::const_iterator part1(org.begin());
- std::map<PartitionId, DiskData>::const_iterator part2(existing.begin());
- while (part1 != org.end() && part2 != existing.end()) {
- if (part1->first < part2->first) {
- if (!part1->second.empty()) {
- FAIL() << "No data in partition " << part1->first << " found.";
- }
- ++part1;
- } else if (part1->first > part2->first) {
- if (!part2->second.empty()) {
- FAIL() << "Found data in partition " << part2->first
- << " which should not exist.";
- }
- ++part2;
- } else {
- auto bucket1 = part1->second.begin();
- auto bucket2 = part2->second.begin();
- while (bucket1 != part1->second.end()
- && bucket2 != part2->second.end())
- {
- if (bucket1->first < bucket2->first) {
- FAIL() << "No data in partition " << part1->first
- << " for bucket " << bucket1->first << " found.";
- } else if (bucket1->first.getId() > bucket2->first.getId())
- {
- FAIL() << "Found data in partition " << part2->first
- << " for bucket " << bucket2->first
- << " which should not exist.";
- } else if (!(bucket1->second.info == bucket2->second.info)) {
- FAIL() << "Bucket " << bucket1->first << " on partition "
- << part1->first << " has bucket info "
- << bucket2->second.info << " and not "
- << bucket1->second.info << " as expected.";
- }
- ++bucket1;
- ++bucket2;
- ++equalCount;
- }
- if (bucket1 != part1->second.end()) {
- FAIL() << "No data in partition " << part1->first
- << " for bucket " << bucket1->first << " found.";
- }
- if (bucket2 != part2->second.end()) {
- FAIL() << "Found data in partition " << part2->first
- << " for bucket " << bucket2->first
- << " which should not exist.";
- }
- ++part1;
- ++part2;
- }
- }
- if (part1 != org.end() && !part1->second.empty()) {
- FAIL() << "No data in partition " << part1->first << " found.";
- }
- if (part2 != existing.end() && !part2->second.empty()) {
- FAIL() << "Found data in partition " << part2->first
- << " which should not exist.";
- }
-}
-
-struct MessageCallback
-{
-public:
- virtual ~MessageCallback() = default;
- virtual void onMessage(const api::StorageMessage&) = 0;
-};
-
-struct FakePersistenceLayer : public StorageLink {
- StorBucketDatabase& bucketDatabase;
- std::map<PartitionId, DiskData>& data;
- std::string firstFatal;
- std::string fatalError;
- MessageCallback* messageCallback;
-
- FakePersistenceLayer(std::map<PartitionId, DiskData>& d,
- StorBucketDatabase& db)
- : StorageLink("fakepersistencelayer"),
- bucketDatabase(db),
- data(d),
- messageCallback(0)
- {
- }
-
- void fatal(vespalib::stringref error) {
- fatalError = error;
- if (firstFatal.empty()) firstFatal = fatalError;
- }
- const BucketData* getBucketData(PartitionId partition,
- const document::BucketId& bucket,
- vespalib::stringref opname)
- {
- std::map<PartitionId, DiskData>::const_iterator it(
- data.find(partition));
- if (it == data.end()) {
- std::ostringstream ost;
- ost << bucket << " is stated to be on partition " << partition
- << " in operation " << opname << ", but we have no data for "
- << "it there.";
- fatal(ost.str());
- } else {
- auto it2 = it->second.find(bucket);
- if (it2 == it->second.end()) {
- std::ostringstream ost;
- ost << "Have no data for " << bucket << " on disk " << partition
- << " in operation " << opname;
- fatal(ost.str());
- } else {
- const BucketData& bucketData(it2->second);
- return &bucketData;
- }
- }
- return 0;
- }
-
- bool onDown(const api::StorageMessage::SP& msg) override {
- fatalError = "";
- if (messageCallback) {
- messageCallback->onMessage(*msg);
- }
- if (msg->getType() == api::MessageType::INTERNAL) {
- auto& cmd = dynamic_cast<api::InternalCommand&>(*msg);
- if (cmd.getType() == ReadBucketList::ID) {
- auto& rbl = dynamic_cast<ReadBucketList&>(cmd);
- ReadBucketListReply::SP reply(new ReadBucketListReply(rbl));
- std::map<PartitionId, DiskData>::const_iterator it(
- data.find(rbl.getPartition()));
- if (it == data.end()) {
- std::ostringstream ost;
- ost << "Got list request to partition "
- << rbl.getPartition()
- << " for which we should not get a request";
- fatal(ost.str());
- } else {
- if (cmd.getBucket().getBucketSpace() == FixedBucketSpaces::default_space()) {
- for (const auto& bd : it->second) {
- reply->getBuckets().push_back(bd.first);
- }
- }
- }
- if (!fatalError.empty()) {
- reply->setResult(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, fatalError));
- }
- sendUp(reply);
- } else if (cmd.getType() == ReadBucketInfo::ID) {
- auto& rbi = dynamic_cast<ReadBucketInfo&>(cmd);
- ReadBucketInfoReply::SP reply(new ReadBucketInfoReply(rbi));
- StorBucketDatabase::WrappedEntry entry(
- bucketDatabase.get(rbi.getBucketId(), "fakelayer"));
- if (!entry.exist()) {
- fatal("Bucket " + rbi.getBucketId().toString()
- + " did not exist in bucket database but we got "
- + "read bucket info request for it.");
- } else {
- const BucketData* bucketData(getBucketData(0, rbi.getBucketId(), "readbucketinfo"));
- if (bucketData != 0) {
- entry->setBucketInfo(bucketData->info);
- entry.write();
- }
- }
- if (!fatalError.empty()) {
- reply->setResult(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, fatalError));
- }
- sendUp(reply);
- } else if (cmd.getType() == InternalBucketJoinCommand::ID) {
- auto& ibj = dynamic_cast<InternalBucketJoinCommand&>(cmd);
- InternalBucketJoinReply::SP reply(
- new InternalBucketJoinReply(ibj));
- StorBucketDatabase::WrappedEntry entry(
- bucketDatabase.get(ibj.getBucketId(), "fakelayer"));
- if (!entry.exist()) {
- fatal("Bucket " + ibj.getBucketId().toString()
- + " did not exist in bucket database but we got "
- + "read bucket info request for it.");
- } else {
- const BucketData* source(getBucketData(
- ibj.getDiskOfInstanceToJoin(), ibj.getBucketId(),
- "internaljoinsource"));
- const BucketData* target(getBucketData(
- ibj.getDiskOfInstanceToKeep(), ibj.getBucketId(),
- "internaljointarget"));
- if (source != 0 && target != 0) {
- entry->setBucketInfo((*source + *target).info);
- entry.write();
- }
- }
- if (!fatalError.empty()) {
- reply->setResult(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, fatalError));
- }
- sendUp(reply);
- } else {
- return false;
- }
- return true;
- }
- return false;
- }
-};
-
-} // end of anonymous namespace
-
-void
-InitializerTest::do_test_initialization(InitParams& params)
-{
- std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params));
-
- assert(params.diskCount == 1u);
- TestServiceLayerApp node(params.nodeIndex, params.getConfig().getConfigId());
- DummyStorageLink top;
- StorageBucketDBInitializer* initializer;
- FakePersistenceLayer* bottom;
- top.push_back(StorageLink::UP(initializer = new StorageBucketDBInitializer(
- params.getConfig().getConfigId(),
- node.getDoneInitializeHandler(),
- node.getComponentRegister())));
- top.push_back(StorageLink::UP(bottom = new FakePersistenceLayer(
- data, node.getStorageBucketDatabase())));
-
- LOG(debug, "STARTING INITIALIZATION");
- top.open();
-
- node.waitUntilInitialized(initializer);
-
- std::map<PartitionId, DiskData> initedBucketDatabase(
- createMapFromBucketDatabase(node.getStorageBucketDatabase()));
- verifyEqual(data, initedBucketDatabase);
-}
-
-TEST_F(InitializerTest, bucket_progress_calculator) {
- using document::BucketId;
- StorageBucketDBInitializer::BucketProgressCalculator calc;
- // We consider the given bucket as not being completed, so progress
- // will be _up to_, not _including_ the bucket. This means we can never
- // reach 1.0, so progress completion must be handled by other logic!
- EXPECT_DOUBLE_EQ(0.0, calc.calculateProgress(BucketId(1, 0)));
- EXPECT_DOUBLE_EQ(0.0, calc.calculateProgress(BucketId(32, 0)));
-
- EXPECT_DOUBLE_EQ(0.5, calc.calculateProgress(BucketId(1, 1)));
-
- EXPECT_DOUBLE_EQ(0.25, calc.calculateProgress(BucketId(2, 2)));
- EXPECT_DOUBLE_EQ(0.5, calc.calculateProgress(BucketId(2, 1)));
- EXPECT_DOUBLE_EQ(0.75, calc.calculateProgress(BucketId(2, 3)));
-
- EXPECT_DOUBLE_EQ(0.875, calc.calculateProgress(BucketId(3, 7)));
-}
-
-struct DatabaseInsertCallback : MessageCallback
-{
- DiskData& _data;
- StorBucketDatabase& _database;
- TestServiceLayerApp& _app;
- const InitializerTest::InitParams& _params;
- bool _invoked;
- double _lastSeenProgress;
- uint8_t _expectedReadBucketPriority;
- std::ostringstream _errors;
- DatabaseInsertCallback(DiskData& data,
- StorBucketDatabase& db,
- TestServiceLayerApp& app,
- const InitializerTest::InitParams& params)
- : _data(data),
- _database(db),
- _app(app),
- _params(params),
- _invoked(false),
- _lastSeenProgress(0),
- _expectedReadBucketPriority(255)
- {}
-
- void onMessage(const api::StorageMessage& msg) override
- {
- // Always make sure we're not set as initialized while we're still
- // processing messages! Also ensure progress never goes down.
- lib::NodeState::CSP reportedState(
- _app.getStateUpdater().getReportedNodeState());
- double progress(reportedState->getInitProgress().getValue());
- LOG(debug, "reported progress is now %g", progress);
- if (progress >= 1.0) {
- _errors << "progress exceeded 1.0: " << progress << "\n";
- }
- if (progress < _lastSeenProgress) {
- _errors << "progress went down! "
- << _lastSeenProgress << " -> " << progress
- << "\n";
- }
- // 16 bits is allowed before we have listed any buckets at all
- // since we at that point have no idea and have not reported anything
- // back to the fleetcontroller.
- if (_params.bucketBitsUsed != reportedState->getMinUsedBits()
- && !(reportedState->getMinUsedBits() == 16 && !_invoked))
- {
- _errors << "reported state contains wrong min used bits. "
- << "expected " << _params.bucketBitsUsed
- << ", but got " << reportedState->getMinUsedBits()
- << "\n";
- }
- _lastSeenProgress = progress;
- if (_invoked) {
- return;
- }
-
- if (msg.getType() == api::MessageType::INTERNAL) {
- auto& cmd = dynamic_cast<const api::InternalCommand&>(msg);
- if (cmd.getType() == ReadBucketInfo::ID) {
- if (cmd.getPriority() != _expectedReadBucketPriority) {
- _errors << "expected ReadBucketInfo priority of "
- << static_cast<int>(_expectedReadBucketPriority)
- << ", was " << static_cast<int>(cmd.getPriority());
- }
- // As soon as we get the first ReadBucketInfo, we insert new buckets
- // into the the bucket database in order to simulate external
- // load init. Kinda hacky, but should work as long as initializer
- // always does at least 1 extra iteration pass (which we use
- // config overrides to ensure happens).
- _invoked = true;
- for (int i = 0; i < 4; ++i) {
- document::BucketId bid(16 + i, 8); // not the first, nor the last bucket
- BucketData d;
- StorBucketDatabase::WrappedEntry entry(
- _database.get(bid, "DatabaseInsertCallback::onMessage",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
- if (entry.preExisted()) {
- _errors << "db entry for " << bid << " already existed";
- }
- if (i < 5) {
- d.info = api::BucketInfo(3+i, 4+i, 5+i, 6+i, 7+i);
- }
- _data[bid] = d;
- entry->setBucketInfo(d.info);
- entry.write();
- }
- }
- }
- }
-};
-
-TEST_F(InitializerTest, buckets_initialized_by_load) {
- InitParams params;
- params.docsPerDisk = 100;
- params.diskCount = DiskCount(1);
- params.getConfig().getConfig("stor-bucket-init").setValue("max_pending_info_reads_per_disk", 1);
- params.getConfig().getConfig("stor-bucket-init").setValue("min_pending_info_reads_per_disk", 1);
- params.getConfig().getConfig("stor-bucket-init")
- .setValue("info_read_priority", 231);
-
- std::map<PartitionId, DiskData> data(buildBucketInfo(_docMan, params));
-
- assert(params.diskCount == 1u);
- TestServiceLayerApp node(params.nodeIndex,
- params.getConfig().getConfigId());
- DummyStorageLink top;
- StorageBucketDBInitializer* initializer;
- FakePersistenceLayer* bottom;
- top.push_back(StorageLink::UP(initializer = new StorageBucketDBInitializer(
- params.getConfig().getConfigId(),
- node.getDoneInitializeHandler(),
- node.getComponentRegister())));
- top.push_back(StorageLink::UP(bottom = new FakePersistenceLayer(
- data, node.getStorageBucketDatabase())));
-
- DatabaseInsertCallback callback(data[0], node.getStorageBucketDatabase(),
- node, params);
- callback._expectedReadBucketPriority = 231;
-
- bottom->messageCallback = &callback;
-
- top.open();
-
- node.waitUntilInitialized(initializer);
- // Must explicitly wait until initializer has closed to ensure node state
- // has been set.
- top.close();
-
- ASSERT_TRUE(callback._invoked);
- EXPECT_EQ(std::string(), callback._errors.str());
-
- std::map<PartitionId, DiskData> initedBucketDatabase(
- createMapFromBucketDatabase(node.getStorageBucketDatabase()));
- verifyEqual(data, initedBucketDatabase);
-
- lib::NodeState::CSP reportedState(
- node.getStateUpdater().getReportedNodeState());
-
- double progress(reportedState->getInitProgress().getValue());
- EXPECT_GE(progress, 1.0);
- EXPECT_LT(progress, 1.0001);
-
- EXPECT_EQ(params.bucketBitsUsed, reportedState->getMinUsedBits());
-}
-
-} // storage
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index edea33491af..a2a0cbd1e68 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "teststorageapp.h"
-#include <vespa/storage/bucketdb/storagebucketdbinitializer.h>
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/config-stor-distribution.h>
#include <vespa/config-load-type.h>
@@ -12,6 +11,7 @@
#include <vespa/config/config.h>
#include <vespa/config/helper/configgetter.hpp>
#include <thread>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".test.servicelayerapp");
@@ -116,7 +116,6 @@ TestStorageApp::waitUntilInitialized(
<< timeout << " seconds.";
if (initializer != 0) {
error << " ";
- initializer->reportStatus(error, framework::HttpUrlPath(""));
LOG(error, "%s", error.str().c_str());
throw std::runtime_error(error.str());
}
diff --git a/storage/src/vespa/storage/bucketdb/CMakeLists.txt b/storage/src/vespa/storage/bucketdb/CMakeLists.txt
index 048cc25ec95..5f491d80697 100644
--- a/storage/src/vespa/storage/bucketdb/CMakeLists.txt
+++ b/storage/src/vespa/storage/bucketdb/CMakeLists.txt
@@ -11,7 +11,6 @@ vespa_add_library(storage_bucketdb OBJECT
generic_btree_bucket_database.cpp
judyarray.cpp
lockablemap.cpp
- storagebucketdbinitializer.cpp
storbucketdb.cpp
DEPENDS
)
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
deleted file mode 100644
index 0761d845ce6..00000000000
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
+++ /dev/null
@@ -1,844 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "storagebucketdbinitializer.h"
-#include "lockablemap.hpp"
-#include "config-stor-bucket-init.h"
-#include "storbucketdb.h"
-#include <vespa/storage/common/nodestateupdater.h>
-#include <vespa/storage/common/content_bucket_space_repo.h>
-#include <vespa/storage/storageserver/storagemetricsset.h>
-#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/config/config.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
-#include <vespa/config/helper/configgetter.hpp>
-#include <iomanip>
-#include <chrono>
-
-#include <vespa/log/bufferedlogger.h>
-LOG_SETUP(".storage.bucketdb.initializer");
-
-using document::BucketSpace;
-using namespace std::chrono_literals;
-
-namespace storage {
-
-using BucketSet = vespalib::hash_set<document::BucketId, document::BucketId::hash>;
-
-struct BucketReadState {
- using UP = std::unique_ptr<BucketReadState>;
-
- BucketSet _pending;
- document::BucketId _databaseIterator;
- bool _done;
-
- BucketReadState() : _done(false) {}
-};
-
-using vespa::config::content::core::StorBucketInitConfig;
-
-StorageBucketDBInitializer::Config::Config(const config::ConfigUri & configUri)
- : _listPriority(0),
- _infoReadPriority(255),
- _minPendingInfoReadsPerDisk(16),
- _maxPendingInfoReadsPerDisk(32)
-{
- auto config = config::ConfigGetter<StorBucketInitConfig>::getConfig(configUri.getConfigId(),
- configUri.getContext());
- _maxPendingInfoReadsPerDisk = config->maxPendingInfoReadsPerDisk;
- _minPendingInfoReadsPerDisk = config->minPendingInfoReadsPerDisk;
- _infoReadPriority = config->infoReadPriority;
- _listPriority = config->listPriority;
- if (config->completeListBeforeStartingRead) {
- LOG(warning, "This config option is currently not honored. Info "
- "reading will always start on a directory as soon as "
- "it is done listing.");
- }
- LOG(debug, "Initializing bucket database: List priority %u, info priority "
- "%u, min/max pending info per disk %u/%u.",
- _listPriority, _infoReadPriority,
- _minPendingInfoReadsPerDisk, _maxPendingInfoReadsPerDisk);
-}
-
-StorageBucketDBInitializer::System::~System() = default;
-
-StorageBucketDBInitializer::System::System(
- DoneInitializeHandler& doneInitializeHandler,
- ServiceLayerComponentRegister& compReg,
- const Config&)
- : _doneInitializeHandler(doneInitializeHandler),
- _component(compReg, "storagebucketdbinitializer"),
- _bucketSpaceRepo(_component.getBucketSpaceRepo()),
- _nodeIndex(_component.getIndex()),
- _nodeState()
-{
- // Is this correct? We should get the node state from the node state updater
- // so it could work with disk capacities. Object is used to check for
- // correct disk further down (in the case of internal join, deciding which
- // should have it). Not that bad if wrong disk is picked though.
- _nodeState.setDiskCount(1);
-}
-
-StorBucketDatabase &
-StorageBucketDBInitializer::System::getBucketDatabase(document::BucketSpace bucketSpace) const
-{
- return _component.getBucketDatabase(bucketSpace);
-}
-
-StorageBucketDBInitializer::Metrics::Metrics(framework::Component& component)
- : metrics::MetricSet("dbinit", {},
- "Metrics for the storage bucket database initializer"),
- _wrongDisk("wrongdisk", {},
- "Number of buckets found on non-ideal disk.", this),
- _insertedCount("insertedcount", {},
- "Number of buckets inserted into database in list step.", this),
- _joinedCount("joinedcount", {},
- "Number of buckets found in list step already found "
- "(added from other disks).", this),
- _infoReadCount("infocount", {},
- "Number of buckets we have read bucket information from.", this),
- _infoSetByLoad("infosetbyload", {},
- "Number of buckets we did not need to request bucket info for "
- "due to load already having updated them.", this),
- _dirsListed("dirslisted", {},
- "Directories listed in list step of initialization.", this),
- _startTime(component.getClock()),
- _listLatency("listlatency", {},
- "Time used until list phase is done. (in ms)", this),
- _initLatency("initlatency", {},
- "Time used until initialization is complete. (in ms)", this)
-{
- component.registerMetric(*this);
-}
-
-StorageBucketDBInitializer::Metrics::~Metrics() {}
-
-StorageBucketDBInitializer::GlobalState::GlobalState()
- : _lists(), _joins(), _infoRequests(), _replies(),
- _insertedCount(0), _infoReadCount(0),
- _infoSetByLoad(0), _dirsListed(0), _dirsToList(0),
- _gottenInitProgress(false), _doneListing(false),
- _doneInitializing(false), _workerLock(), _workerCond(), _replyLock()
-{ }
-StorageBucketDBInitializer::GlobalState::~GlobalState() { }
-
-StorageBucketDBInitializer::StorageBucketDBInitializer(
- const config::ConfigUri & configUri,
- DoneInitializeHandler& doneInitializeHandler,
- ServiceLayerComponentRegister& compReg)
- : StorageLink("StorageBucketDBInitializer"),
- framework::HtmlStatusReporter("dbinit", "Bucket database initializer"),
- _config(configUri),
- _system(doneInitializeHandler, compReg, _config),
- _metrics(_system._component),
- _state(),
- _readState(1u)
-{
- // Initialize read state for disks being available
- for (uint32_t i=0; i< _readState.size(); ++i) {
- _readState[i] = std::make_unique<BucketSpaceReadState>();
- for (const auto &elem : _system._bucketSpaceRepo) {
- _readState[i]->emplace(elem.first, std::make_unique<BucketReadState>());
- _state._dirsToList += 1;
- }
- }
- _system._component.registerStatusPage(*this);
-}
-
-StorageBucketDBInitializer::~StorageBucketDBInitializer()
-{
- if (_system._thread.get() != 0) {
- LOG(error, "Deleted without calling close() first");
- onClose();
- }
- closeNextLink();
-}
-
-void
-StorageBucketDBInitializer::onOpen()
-{
- // Trigger bucket database initialization
- for (uint32_t i=0; i< _readState.size(); ++i) {
- assert(_readState[i]);
- const BucketSpaceReadState &spaceState = *_readState[i];
- for (const auto &stateElem : spaceState) {
- document::BucketSpace bucketSpace = stateElem.first;
- auto msg = std::make_shared<ReadBucketList>(bucketSpace, spi::PartitionId(i));
- _state._lists[msg->getMsgId()] = msg;
- sendDown(msg);
- }
- }
- _system._thread = _system._component.startThread(*this, 10ms, 1s);
-}
-
-void
-StorageBucketDBInitializer::onClose()
-{
- if (_system._thread) {
- _system._thread->interruptAndJoin(_state._workerCond);
- _system._thread.reset();
- }
-}
-
-void
-StorageBucketDBInitializer::run(framework::ThreadHandle& thread)
-{
- std::unique_lock<std::mutex> guard(_state._workerLock);
- while (!thread.interrupted() && !_state._doneInitializing) {
- std::list<api::StorageMessage::SP> replies;
- {
- std::lock_guard<std::mutex> replyGuard(_state._replyLock);
- _state._replies.swap(replies);
- }
- for (api::StorageMessage::SP & msg : replies) {
- api::InternalReply& reply(static_cast<api::InternalReply&>(*msg));
- if (reply.getType() == ReadBucketListReply::ID) {
- handleReadBucketListReply(static_cast<ReadBucketListReply&>(reply));
- } else if (reply.getType() == ReadBucketInfoReply::ID) {
- handleReadBucketInfoReply(static_cast<ReadBucketInfoReply&>(reply));
- } else if (reply.getType() == InternalBucketJoinReply::ID) {
- handleInternalBucketJoinReply(static_cast<InternalBucketJoinReply&>(reply));
- }
- }
- if (_state._gottenInitProgress) {
- _state._gottenInitProgress = false;
- updateInitProgress();
- }
- if (replies.empty()) {
- _state._workerCond.wait_for(guard, 10ms);
- thread.registerTick(framework::WAIT_CYCLE);
- } else {
- thread.registerTick(framework::PROCESS_CYCLE);
- }
- }
-}
-
-void
-StorageBucketDBInitializer::print(
- std::ostream& out, bool verbose, const std::string& indent) const
-{
- (void) verbose; (void) indent;
- out << "StorageBucketDBInitializer()";
-}
-
-namespace {
-
-size_t
-notDoneCount(const StorageBucketDBInitializer::ReadState &readState)
-{
- size_t result = 0;
- for (const auto &elem : readState) {
- if (elem) {
- for (const auto &stateElem : *elem) {
- if (!stateElem.second->_done) {
- ++result;
- }
- }
- }
- }
- return result;
-}
-
-}
-
-void
-StorageBucketDBInitializer::reportHtmlStatus(
- std::ostream& out, const framework::HttpUrlPath&) const
-{
- std::lock_guard<std::mutex> guard(_state._workerLock);
- out << "\n <h2>Config</h2>\n"
- << " <table>\n"
- << " <tr><td>Max pending info reads per disk</td><td>"
- << _config._maxPendingInfoReadsPerDisk << "</td></tr>\n"
- << " <tr><td>Min pending info reads per disk</td><td>"
- << _config._minPendingInfoReadsPerDisk << "</td></tr>\n"
- << " <tr><td>List priority</td><td>"
- << _config._listPriority << "</td></tr>\n"
- << " <tr><td>Info read priority</td><td>"
- << _config._infoReadPriority << "</td></tr>\n"
- << " </table>\n";
-
- out << "\n <h2>Init progress</h2>\n";
- if (_state._doneListing) {
- out << " Done listing.<br/>\n";
- } else {
- out << " Listed " << _state._dirsListed << " of "
- << _state._dirsToList << " partitions.<br/>\n";
- }
- if (_state._lists.empty()) {
- out << " No lists pending.<br/>\n";
- } else {
- out << " " << _state._lists.size() << " lists pending.<br/>\n";
- }
- if (_state._joins.empty()) {
- out << " No internal joins pending.<br/>\n";
- } else {
- out << " " << _state._joins.size()
- << " internal joins pending.<br/>\n";
- }
- if (_state._infoRequests.empty()) {
- out << " No info requests pending.<br/>\n";
- } else {
- out << " " << _state._infoRequests.size()
- << " info requests pending.<br/>\n";
- }
- uint32_t incompleteScan = notDoneCount(_readState);
- if (incompleteScan == 0) {
- out << " Done iterating bucket database to generate info "
- << "requests.<br/>\n";
- } else {
- out << " " << incompleteScan << " partitions still have buckets "
- << "that needs bucket info.<br/>\n";
- }
- out << " Init progress gotten after state update: "
- << (_state._gottenInitProgress ? "true" : "false") << "<br/>\n";
- if (_state._doneInitializing) {
- out << " Initialization complete.\n";
- } else {
- out << " Initialization not completed.\n";
- }
-
- out << "\n <h2>Metrics</h2>\n";
- out << " " << _metrics._insertedCount.toString(true) << "<br/>\n"
- << " " << _metrics._joinedCount.toString(true) << "<br/>\n"
- << " " << _metrics._infoReadCount.toString(true) << "<br/>\n"
- << " " << _metrics._infoSetByLoad.toString(true) << "<br/>\n"
- << " " << _metrics._dirsListed.toString(true) << "<br/>\n"
- << " Dirs to list " << _state._dirsToList << "<br/>\n";
- if (!_state._joins.empty()) {
- out << "\n <h2>Pending internal bucket joins</h2>\n";
- for (const auto & e : _state._joins) {
- out << " " << e.first << " - " << *e.second << "<br/>\n";
- }
- }
- out << "\n <h2>Info read state</h2>\n";
- std::map<Disk, uint32_t> pendingCounts;
- for (const auto & e : _state._infoRequests)
- {
- ++pendingCounts[e.second];
- }
- for (uint32_t i=0; i<_readState.size(); ++i) {
- if (_readState[i].get() == 0) {
- out << " <h3>Disk " << i << " is down</h3>\n";
- continue;
- }
- const BucketSpaceReadState& spaceState(*_readState[i]);
- for (const auto &stateElem : spaceState) {
- const BucketReadState &state = *stateElem.second;
- out << " <h3>Disk " << i << ", bucket space " << stateElem.first.getId() << "</h3>\n";
- out << " Pending info requests: " << pendingCounts[i] << " (";
- if (state._pending.empty()) {
- out << "none";
- } else {
- bool first = true;
- for (BucketSet::const_iterator it = state._pending.begin();
- it != state._pending.end(); ++it) {
- if (!first) {
- out << ", ";
- } else {
- first = false;
- }
- out << *it;
- }
- }
- out << ")<br/>\n";
- out << " Bucket database iterator: " << state._databaseIterator
- << "<br/>\n";
- out << " Done iterating bucket database. "
- << (state._done ? "true" : "false") << "<br/>\n";
- }
- }
- for (std::map<Disk, uint32_t>::iterator it = pendingCounts.begin();
- it != pendingCounts.end(); ++it)
- {
- out << " Disk " << it->first << ": " << it->second << "<br/>\n";
- }
-}
-
-// Always called from worker thread. Worker monitor already grabbed
-void
-StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket,
- const lib::Distribution &distribution,
- spi::PartitionId partition,
- api::BucketInfo bucketInfo)
-{
- document::BucketId bucketId(bucket.getBucketId());
- StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(bucket.getBucketSpace()).get(
- bucketId, "StorageBucketDBInitializer::registerBucket",
- StorBucketDatabase::CREATE_IF_NONEXISTING));
- if (bucketInfo.valid()) {
- if (entry.preExisted()) {
- LOG(debug, "Had value %s for %s before registering",
- entry->getBucketInfo().toString().c_str(),
- bucketId.toString().c_str());
- }
- LOG(debug, "Got new value %s from %s partition %u",
- bucketInfo.toString().c_str(), bucketId.toString().c_str(),
- partition.getValue());
- entry->setBucketInfo(bucketInfo);
- } else {
- LOG(debug, "Got invalid bucket info from %s partition %u: %s",
- bucketId.toString().c_str(), partition.getValue(),
- bucketInfo.toString().c_str());
- }
- if (entry.preExisted()) {
- if (0 == partition) {
- LOG(debug, "%s already existed in bucket database on disk %i. "
- "Might have been moved from wrong directory prior to "
- "listing this directory.",
- bucketId.toString().c_str(), int(partition));
- return;
- }
- uint32_t keepOnDisk, joinFromDisk;
- if (distribution.getPreferredAvailableDisk(
- _system._nodeState, _system._nodeIndex,
- bucketId.stripUnused()) == partition)
- {
- keepOnDisk = partition;
- joinFromDisk = 0;
- } else {
- keepOnDisk = 0;
- joinFromDisk = partition;
- }
- LOG(debug, "%s exist on both disk 0 and disk %i. Joining two versions "
- "onto disk %u.",
- bucketId.toString().c_str(), int(partition), keepOnDisk);
- entry.unlock();
- // Must not have bucket db lock while sending down
- auto cmd = std::make_shared<InternalBucketJoinCommand>(bucket, keepOnDisk, joinFromDisk);
- {
- _state._joins[cmd->getMsgId()] = cmd;
- }
- sendDown(cmd);
- } else {
- _system._component.getMinUsedBitsTracker().update(bucketId);
- LOG(spam, "Inserted %s on disk %i into bucket database",
- bucketId.toString().c_str(), int(partition));
- entry.write();
- uint16_t disk(distribution.getIdealDisk(
- _system._nodeState, _system._nodeIndex, bucketId.stripUnused(),
- lib::Distribution::IDEAL_DISK_EVEN_IF_DOWN));
- if (disk != partition) {
- _metrics._wrongDisk.inc();
- }
-
- _metrics._insertedCount.inc();
- ++_state._insertedCount;
- }
-}
-
-namespace {
- struct NextBucketOnDiskFinder {
- typedef document::BucketId BucketId;
-
- uint16_t _disk;
- BucketId& _iterator;
- uint16_t _count;
- std::vector<BucketId> _next;
- uint32_t _alreadySet;
-
- NextBucketOnDiskFinder(uint16_t disk, BucketId& iterator,
- uint16_t maxToFind)
- : _disk(disk), _iterator(iterator), _count(maxToFind),
- _next(), _alreadySet(0) {}
-
- StorBucketDatabase::Decision operator()(
- uint64_t revBucket, const StorBucketDatabase::Entry& entry)
- {
- BucketId bucket(BucketId::keyToBucketId(revBucket));
- if (bucket == _iterator) {
- //LOG(spam, "Ignoring bucket %s as it has value of current "
- // "iterator", bucket.toString().c_str());
- return StorBucketDatabase::Decision::CONTINUE;
- }
- _iterator = bucket;
- if (0 != _disk) {
- //LOG(spam, "Ignoring bucket %s as it is not on disk currently "
- // "being processed", bucket.toString().c_str());
- // Ignore. We only want to scan for one disk
- } else if (entry.valid()) {
- LOG(spam, "%s already initialized by load %s. "
- "Not requesting info",
- bucket.toString().c_str(),
- entry.getBucketInfo().toString().c_str());
- ++_alreadySet;
- } else {
- _next.push_back(_iterator);
- if (_next.size() >= _count) {
- LOG(spam, "Aborting iterating for disk %u as we have "
- "enough results. Leaving iterator at %s",
- uint32_t(_disk), _iterator.toString().c_str());
- return StorBucketDatabase::Decision::ABORT;
- }
- }
- return StorBucketDatabase::Decision::CONTINUE;
- }
- };
-}
-
-// Always called from worker thread. It holds worker monitor.
-void
-StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk, document::BucketSpace bucketSpace)
-{
- auto itr = _readState[disk]->find(bucketSpace);
- assert(itr != _readState[disk]->end());
- BucketReadState& state = *itr->second;
- if (state._done
- || state._pending.size() >= _config._maxPendingInfoReadsPerDisk)
- {
- LOG(spam, "No need to iterate further. Database has completed "
- "iterating buckets for disk %u.", uint32_t(disk));
- return;
- }
- uint32_t count(_config._maxPendingInfoReadsPerDisk - state._pending.size());
- NextBucketOnDiskFinder finder(disk, state._databaseIterator, count);
- LOG(spam, "Iterating bucket db further. Starting at iterator %s",
- state._databaseIterator.toString().c_str());
- _system.getBucketDatabase(bucketSpace).for_each(
- std::ref(finder),
- "StorageBucketDBInitializer::readBucketInfo",
- state._databaseIterator.stripUnused().toKey());
- if (finder._alreadySet > 0) {
- _metrics._infoSetByLoad.inc(finder._alreadySet);
- _state._infoSetByLoad += finder._alreadySet;
- }
- for (uint32_t i=0; i<finder._next.size(); ++i) {
- document::Bucket bucket(bucketSpace, finder._next[i]);
- auto cmd = std::make_shared<ReadBucketInfo>(bucket);
- cmd->setPriority(_config._infoReadPriority);
- state._pending.insert(finder._next[i]);
- _state._infoRequests[cmd->getMsgId()] = disk;
- LOG(spam, "Requesting bucket info from %s on disk %u.",
- finder._next[i].toString().c_str(), uint32_t(disk));
- sendDown(cmd);
- }
- state._done |= finder._next.empty();
- _state._gottenInitProgress = true;
- checkIfDone();
-}
-
-bool
-StorageBucketDBInitializer::onDown(
- const std::shared_ptr<api::StorageMessage>& msg)
-{
- // If we're done listing, load can go as normal.
- // Rationale behind memory_order_relaxed: _doneListing is initially false
- // and is ever only written once. Since the behavior for temporarily
- // reading a stale default is safe (block the message) and we do not
- // access any other shared state dependent on _doneListing, relaxed
- // semantics should be fine here.
- if (_state._doneListing.load(std::memory_order_relaxed)) {
- return StorageLink::onDown(msg);
- }
-
- // If we're not done listing, block most types of load
-
- // There are no known replies, but if there are to come any, they should
- // likely not be blocked.
- if (msg->getType().isReply()) return false;
-
- switch (msg->getType().getId()) {
- // Don't want to block communication with state manager
- case api::MessageType::SETSYSTEMSTATE_ID:
- case api::MessageType::GETNODESTATE_ID:
- return StorageLink::onDown(msg);
- default:
- break;
- }
- // Fail everything else
- std::ostringstream ost;
- ost << "Cannot perform operation " << msg->getType() << " now because "
- << "we are still listing buckets from disk.";
- LOGBP(warning, "%s", ost.str().c_str());
- std::unique_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*msg).makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, ost.str()));
- sendUp(std::shared_ptr<api::StorageReply>(reply.release()));
- return true;
-}
-
-// Called from disk threads. Just push replies to reply list so worker thread
-// can handle it. This minimizes locking needed. Disk reads should be the
-// limiting factor, so don't need to update initializer state in multiple
-// threads.
-bool
-StorageBucketDBInitializer::onInternalReply(
- const std::shared_ptr<api::InternalReply>& reply)
-{
- switch(reply->getType()) {
- case ReadBucketListReply::ID:
- case ReadBucketInfoReply::ID:
- case InternalBucketJoinReply::ID:
- {
- std::lock_guard<std::mutex> guard(_state._replyLock);
- _state._replies.push_back(reply);
- return true;
- }
- default:
- return false;
- }
-}
-
-// Always called from worker thread. It holds worker monitor.
-void
-StorageBucketDBInitializer::handleReadBucketListReply(
- ReadBucketListReply& reply)
-{
- vespalib::hash_map<api::StorageMessage::Id,
- ReadBucketList::SP>::iterator it(
- _state._lists.find(reply.getMsgId()));
- if (it == _state._lists.end()) {
- LOGBP(warning, "Got bucket list reply for partition %u, request "
- "%" PRIu64 ", that was not registered pending.",
- reply.getPartition().getValue(), reply.getMsgId());
- } else {
- _state._lists.erase(it);
- }
- // We don't handle failed bucket listings. Kill process. Bucket lists are
- // essential for storage node operations
- if (reply.getResult().failed()) {
- LOG(debug, "Got failing bucket list reply. Requesting shutdown");
- _system._component.requestShutdown(
- "Failed to list buckets. Cannot run without bucket list: "
- + reply.getResult().toString());
- return;
- }
- _metrics._dirsListed.inc();
- _state._gottenInitProgress = true;
- const spi::BucketIdListResult::List& list(reply.getBuckets());
- api::BucketInfo info;
- assert(!info.valid());
- const auto &contentBucketSpace(_system._bucketSpaceRepo.get(reply.getBucketSpace()));
- auto distribution(contentBucketSpace.getDistribution());
- for (uint32_t i=0, n=list.size(); i<n; ++i) {
- registerBucket(document::Bucket(reply.getBucketSpace(), list[i]), *distribution, reply.getPartition(), info);
- }
- if (++_state._dirsListed == _state._dirsToList) {
- handleListingCompleted();
- }
- checkIfDone();
- sendReadBucketInfo(reply.getPartition(), reply.getBucketSpace());
-}
-
-// Always called from worker thread. It holds worker monitor.
-void
-StorageBucketDBInitializer::handleReadBucketInfoReply(
- ReadBucketInfoReply& reply)
-{
- document::BucketSpace bucketSpace = reply.getBucket().getBucketSpace();
- if (reply.getResult().failed()) {
- LOGBP(warning, "Deleting %s from bucket database. Cannot use it as we "
- "failed to read bucket info for it: %s",
- reply.getBucketId().toString().c_str(),
- reply.getResult().toString().c_str());
- _system.getBucketDatabase(bucketSpace).erase(reply.getBucketId(),
- "dbinit.failedreply");
- }
- _metrics._infoReadCount.inc();
- ++_state._infoReadCount;
- _state._gottenInitProgress = true;
- vespalib::hash_map<api::StorageMessage::Id, Disk>::iterator it(
- _state._infoRequests.find(reply.getMsgId()));
- if (it == _state._infoRequests.end()) {
- LOGBP(warning, "Got bucket info reply for %s, request %" PRIu64 ", that "
- "was not registered pending.",
- reply.getBucketId().toString().c_str(), reply.getMsgId());
- checkIfDone();
- } else {
- uint32_t disk(it->second);
- _state._infoRequests.erase(it->first);
- auto itr = _readState[disk]->find(bucketSpace);
- assert(itr != _readState[disk]->end());
- BucketReadState& state = *itr->second;
- BucketSet::iterator it2(state._pending.find(reply.getBucketId()));
- if (it2 == state._pending.end()) {
- LOGBP(warning, "Got bucket info reply for %s that was registered "
- "in global state but not in disk %u's state.",
- reply.getBucketId().toString().c_str(), disk);
- } else {
- state._pending.erase(reply.getBucketId());
- LOG(spam, "Got info reply for %s: %s",
- reply.getBucketId().toString().c_str(),
- _system.getBucketDatabase(reply.getBucket().getBucketSpace()).get(
- reply.getBucketId(), "dbinit.inforeply")
- ->getBucketInfo().toString().c_str());
- }
- checkIfDone();
- sendReadBucketInfo(spi::PartitionId(disk), bucketSpace);
- }
-}
-
-// Always called from worker thread. It holds worker monitor.
-void
-StorageBucketDBInitializer::handleInternalBucketJoinReply(
- InternalBucketJoinReply& reply)
-{
- _metrics._joinedCount.inc();
- vespalib::hash_map<api::StorageMessage::Id,
- InternalBucketJoinCommand::SP>::iterator it(
- _state._joins.find(reply.getMsgId()));
- if (reply.getResult().failed()) {
- LOGBP(warning, "Failed to join multiple copies of %s. One of the "
- "versions will not be available: %s",
- reply.getBucketId().toString().c_str(),
- reply.getResult().toString().c_str());
- }
- if (it != _state._joins.end()) {
- _state._joins.erase(reply.getMsgId());
- LOG(debug, "Completed internal bucket join for %s. Got bucket info %s",
- reply.getBucketId().toString().c_str(),
- reply.getBucketInfo().toString().c_str());
- StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(reply.getBucket().getBucketSpace()).get(
- reply.getBucketId(),
- "StorageBucketDBInitializer::onInternalBucketJoinReply"));
- entry->setBucketInfo(reply.getBucketInfo());
- entry.write();
- } else {
- LOGBP(warning, "Got internal join reply for %s which was not "
- "registered to be pending.",
- reply.getBucketId().toString().c_str());
- }
- checkIfDone();
-}
-
-namespace {
-
-bool
-isDone(const StorageBucketDBInitializer::ReadState &readState)
-{
- return notDoneCount(readState) == 0;
-}
-
-}
-
-// Always called from worker thread. It holds worker monitor.
-void
-StorageBucketDBInitializer::checkIfDone()
-{
- if (_state._dirsListed < _state._dirsToList) return;
- if (!_state._infoRequests.empty()) return;
- if (!_state._joins.empty()) return;
- if (!isDone(_readState)) {
- return;
- }
- _state._doneInitializing = true;
- _system._doneInitializeHandler.notifyDoneInitializing();
- _metrics._initLatency.addValue(_metrics._startTime.getElapsedTimeAsDouble());
- LOG(debug, "Completed initializing");
-}
-
-double
-StorageBucketDBInitializer::calculateMinProgressFromDiskIterators() const
-{
- double minProgress = 1.0;
- for (size_t disk = 0; disk < _readState.size(); ++disk) {
- if (_readState[disk].get() == 0) {
- continue;
- }
- for (const auto &stateElem : *_readState[disk]) {
- const BucketReadState &state = *stateElem.second;
- document::BucketId bid(state._databaseIterator);
-
- double progress;
- if (!state._done) {
- progress = BucketProgressCalculator::calculateProgress(bid);
- } else {
- progress = 1.0;
- }
-
- minProgress = std::min(minProgress, progress);
- }
- }
- //std::cerr << "minProgress: " << minProgress << "\n";
- return minProgress;
-}
-
-// Always called from worker thread. It holds worker monitor.
-double
-StorageBucketDBInitializer::calcInitProgress() const
-{
- double listProgress(_state._dirsToList == 0
- ? 0 : _state._dirsListed / _state._dirsToList);
- // Do sanity check
- if (_state._dirsListed > _state._dirsToList) {
- LOG(error, "%" PRIu64 " of %u dirs are reported listed. This is a bug.",
- _state._dirsListed, _state._dirsToList);
- listProgress = 1.0;
- }
- double infoProgress(calculateMinProgressFromDiskIterators());
- if (_state._dirsToList > _state._dirsListed
- && infoProgress > 0)
- {
- LOG(debug, "Not done with list step yet. (%" PRIu64 " of %u done). "
- "Need to nullify info part of progress so fleetcontroller "
- "doesn't think listing is completed.",
- _state._dirsListed, _state._dirsToList);
- infoProgress = 0;
-
- // Currently we never honor complete_list_before_starting_read option.
- // We might want to do that later, in order to be able to enforce
- // waiting to read. For instance, if we have usecase where several
- // directories map to the same disk, such that reading info is slowing
- // down directory listing to such an extent that quick restart aint
- // quick enough anymore. If we do, revert to make this an error if that
- // config option is enabled
- }
-
- double listLimit = lib::NodeState::getListingBucketsInitProgressLimit();
- double progress(listLimit * listProgress
- + (1.0 - listLimit) * infoProgress);
- assert(progress < 1.000000001);
- return progress;
-}
-
-// Always called from worker thread. It holds worker monitor.
-void
-StorageBucketDBInitializer::updateInitProgress() const
-{
- double progress = calcInitProgress();
- NodeStateUpdater::Lock::SP lock(
- _system._component.getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(
- *_system._component.getStateUpdater().getReportedNodeState());
- LOG(debug, "Reporting node init progress as %g", progress);
- ns.setInitProgress(progress);
- ns.setMinUsedBits(_system._component.getMinUsedBitsTracker()
- .getMinUsedBits());
- _system._component.getStateUpdater().setReportedNodeState(ns);
-}
-
-// Always called from worker thread. It holds worker monitor.
-void
-StorageBucketDBInitializer::handleListingCompleted()
-{
- assert(!_state._doneListing);
- _state._doneListing = true;
- if (_state._dirsToList != _state._dirsListed) {
- LOG(warning, "After list phase completed, counters indicate we've "
- "listed %" PRIu64 " of %u directories. This is a bug.",
- _state._dirsListed, _state._dirsToList);
- }
- LOG(info, "Completed listing buckets from disk. Minimum used bits is %u",
- _system._component.getMinUsedBitsTracker().getMinUsedBits());
- _metrics._listLatency.addValue(_metrics._startTime.getElapsedTimeAsDouble());
-}
-
-double
-StorageBucketDBInitializer::
-BucketProgressCalculator::calculateProgress(const document::BucketId& bid)
-{
- uint64_t revBucket(document::BucketId::bucketIdToKey(bid.getId()));
-
- // Remove unused bits
- uint64_t progressBits(revBucket >> (64 - bid.getUsedBits()));
-/*
- std::cerr << bid << ":\n";
- std::cerr << "revBucket: " << std::hex << revBucket << ", progressBits: " << progressBits
- << ", divisor: " << (1ULL << bid.getUsedBits())
- << ", result= " << (static_cast<double>(progressBits) / (1ULL << bid.getUsedBits()))
- << "\n";
-*/
- return static_cast<double>(progressBits) / (1ULL << bid.getUsedBits());
-}
-
-} // storage
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
deleted file mode 100644
index 00d39965151..00000000000
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
+++ /dev/null
@@ -1,227 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \class storage::StorageBucketDBInitializer
- * \ingroup bucketdb
- *
- * \brief Initializes the bucket database on the storage node.
- *
- * The storage bucket DB is responsible for initializing the bucket database on
- * the storage node. This used to be the task of the bucket manager, but to
- * make the implementation cleaner, the logic for this has been separated.
- *
- * This works as follows:
- *
- * 1. When component is started (onOpen), partition states should already have
- * been aquired from the SPI and made available to this class. Requests for
- * listing buckets will be sent to all partitions. Background thread will be
- * started to avoid doing processes in thread sending replies.
- *
- * 2. Upon receiving bucket lists into background thread, the bucket database
- * will be populated with buckets. Bucket information may at this point be
- * invalid or not, depending on persistence provider. Providers that can list
- * cheaply but where getting info is more expensive, will likely want to return
- * invalid entries as the node can start handling load as fast as bucket lists
- * is known. Providers who gets info and bucket lists equally cheap will likely
- * prefer to give info at once to avoid the read step.
- *
- * 3. Upon receiving the last bucket list, the background thread will be started
- * to do remaining work.
- *
- * 4. Background thread will iterate through the bucket database, issuing
- * bucket info requests for all buckets that have invalid bucket info. Once the
- * whole bucket database has been iterated and there are no longer pending
- * operations, initialization is complete, and node will be tagged in up state.
- */
-
-#pragma once
-
-#include <atomic>
-#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/metrics/metrics.h>
-#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/storage/bucketdb/minimumusedbitstracker.h>
-#include <vespa/storage/common/bucketmessages.h>
-#include <vespa/storage/common/doneinitializehandler.h>
-#include <vespa/storage/common/servicelayercomponent.h>
-#include <vespa/storage/common/storagelink.h>
-#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
-#include <vespa/storageframework/generic/clock/timer.h>
-#include <vespa/vespalib/stllike/hash_map.h>
-#include <vespa/vdslib/state/nodestate.h>
-#include <vespa/config/subscription/configuri.h>
-#include <list>
-#include <unordered_map>
-#include <mutex>
-#include <condition_variable>
-
-namespace storage {
-
-struct BucketReadState;
-
-class StorageBucketDBInitializer : public StorageLink,
- public framework::HtmlStatusReporter,
- private framework::Runnable
-{
- typedef uint16_t Disk;
- typedef vespalib::hash_map<api::StorageMessage::Id, Disk> IdDiskMap;
-
- struct Config {
- // List priority should be larger than info priority.
- uint16_t _listPriority;
- uint16_t _infoReadPriority;
- // When going below this amount of pending, send more until we reach max
- uint16_t _minPendingInfoReadsPerDisk;
- uint16_t _maxPendingInfoReadsPerDisk;
-
- Config(const config::ConfigUri & configUri);
- };
- struct System {
- DoneInitializeHandler& _doneInitializeHandler;
- ServiceLayerComponent _component;
- const ContentBucketSpaceRepo& _bucketSpaceRepo;
- uint32_t _nodeIndex;
- lib::NodeState _nodeState; // Disk info for ideal state calculations
- framework::Thread::UP _thread;
-
- System(DoneInitializeHandler& doneInitializeHandler,
- ServiceLayerComponentRegister&,
- const Config&);
- ~System();
-
- StorBucketDatabase &getBucketDatabase(document::BucketSpace bucketSpace) const;
- };
- struct Metrics : public metrics::MetricSet {
- metrics::LongCountMetric _wrongDisk;
- metrics::LongCountMetric _insertedCount;
- metrics::LongCountMetric _joinedCount;
- metrics::LongCountMetric _infoReadCount;
- metrics::LongCountMetric _infoSetByLoad;
- metrics::LongCountMetric _dirsListed;
- framework::MilliSecTimer _startTime;
- metrics::LongAverageMetric _listLatency;
- metrics::LongAverageMetric _initLatency;
-
- Metrics(framework::Component&);
- ~Metrics();
- };
- struct GlobalState {
- vespalib::hash_map<api::StorageMessage::Id, ReadBucketList::SP> _lists;
- vespalib::hash_map<api::StorageMessage::Id, InternalBucketJoinCommand::SP> _joins;
- IdDiskMap _infoRequests;
- std::list<api::StorageMessage::SP> _replies;
- uint64_t _insertedCount;
- uint64_t _infoReadCount;
- uint64_t _infoSetByLoad;
- uint64_t _dirsListed;
- uint32_t _dirsToList;
- bool _gottenInitProgress;
- std::atomic<bool> _doneListing;
- bool _doneInitializing;
- // This lock is held while the worker thread is working, such that
- // status retrieval can lock it. Listing part only grabs it when
- // needed to supporting listing in multiple threads
- mutable std::mutex _workerLock;
- std::condition_variable _workerCond;
- // This lock protects the reply list.
- std::mutex _replyLock;
-
- GlobalState();
- ~GlobalState();
- };
-
-public:
- using BucketSpaceReadState = std::unordered_map<document::BucketSpace,
- std::unique_ptr<BucketReadState>, document::BucketSpace::hash>;
- using ReadState = std::vector<std::unique_ptr<BucketSpaceReadState>>;
-
-private:
- Config _config;
- System _system;
- Metrics _metrics;
- GlobalState _state;
- ReadState _readState;
-
-public:
- StorageBucketDBInitializer(const config::ConfigUri&,
- DoneInitializeHandler&,
- ServiceLayerComponentRegister&);
- ~StorageBucketDBInitializer();
-
- void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
- void onOpen() override;
- void onClose() override;
-
- void run(framework::ThreadHandle&) override;
-
- bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
- bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override;
-
- void handleReadBucketListReply(ReadBucketListReply&);
- void handleReadBucketInfoReply(ReadBucketInfoReply&);
- void handleInternalBucketJoinReply(InternalBucketJoinReply&);
-
- /** Status implementation. */
- void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
-
- // The below functions should only be called by the class itself, but they
- // are left public for easability of access for unit tests and anonymous
- // classes defined in implementation.
-
- /** Get the path of a given directory. */
- std::string getPathName(std::vector<uint32_t>& path,
- const document::BucketId* = 0) const;
- /** Process a given file found through listing files on disk */
- bool processFile(std::vector<uint32_t>& path, const std::string& pathName,
- const std::string& name);
- /**
- * Find what bucket identifier file corresponds to.
- * Invalid bucket indicates none. (Invalid file name)
- */
- document::BucketId extractBucketId(const std::string& name) const;
- /**
- * Handle that the bucket might have been found in the wrong position.
- * Returns true if we should attepmt to register the bucket.
- */
- bool handleBadLocation(const document::BucketId&,
- std::vector<uint32_t>& path);
- /** Register a bucket in the bucket database. */
- void registerBucket(const document::Bucket &bucket,
- const lib::Distribution &distribution,
- spi::PartitionId,
- api::BucketInfo bucketInfo);
- /**
- * Sends more read bucket info to a given disk. Lock must already be taken.
- * Will be released by function prior to sending messages down.
- */
- void sendReadBucketInfo(spi::PartitionId, document::BucketSpace bucketSpace);
- /** Check whether initialization is complete. Should hold lock to call it.*/
- void checkIfDone();
-
- /** Calculate minimum progress from all disks' bucket db iterators */
- double calculateMinProgressFromDiskIterators() const;
- /** Calculate how far we have progressed initializing. */
- double calcInitProgress() const;
- /** Update node state if init progress have changed enough. */
- void updateInitProgress() const;
- /** Handle that we're done listing buckets. */
- void handleListingCompleted();
-
- /** Used for unit tests to see that stuff has happened. */
- virtual const Metrics& getMetrics() const { return _metrics; }
-
-
- class BucketProgressCalculator
- {
- public:
- /**
- * Estimate progress into the total bucket space.
- * Done by taking reverse bucket key, shifting away unused bits and
- * dividing the result by 2**used bits to get approximate progress.
- * @param bid Current bucket space iterator/cursor.
- */
- static double calculateProgress(const document::BucketId& bid);
- };
-};
-
-} // storage
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 6c48add6f53..e7ce6737ac8 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -13,7 +13,6 @@
#include <vespa/storage/visiting/messagebusvisitormessagesession.h>
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
-#include <vespa/storage/bucketdb/storagebucketdbinitializer.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/persistence/spi/exceptions.h>