summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@oath.com>2017-11-13 15:25:43 +0000
committerGeir Storli <geirst@oath.com>2017-11-13 15:25:43 +0000
commitf74af1fbe7eb1be0a485942506f4458a3e183955 (patch)
treea43a700871a985349ca16dc79bd9451ab1461754 /storage
parent9bb55560ce93affa2685f7c787339b9994a1c5f8 (diff)
Make StorageBucketDBInitializer bucket space aware.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp153
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h14
2 files changed, 113 insertions, 54 deletions
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
index 3a832f0fe3b..15338d733b3 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
+++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
@@ -5,6 +5,7 @@
#include "config-stor-bucket-init.h"
#include "storbucketdb.h"
#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/common/content_bucket_space_repo.h>
#include <vespa/storage/storageserver/storagemetricsset.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/io/fileutil.h>
@@ -65,7 +66,7 @@ StorageBucketDBInitializer::System::System(
: _doneInitializeHandler(doneInitializeHandler),
_component(compReg, "storagebucketdbinitializer"),
_partitions(partitions),
- _bucketDatabase(_component.getBucketDatabase(BucketSpace::placeHolder())),
+ _bucketSpaceRepo(_component.getBucketSpaceRepo()),
_nodeIndex(_component.getIndex()),
_distribution(*_component.getDistribution()),
_nodeState()
@@ -82,6 +83,12 @@ StorageBucketDBInitializer::System::System(
}
}
+StorBucketDatabase &
+StorageBucketDBInitializer::System::getBucketDatabase(document::BucketSpace bucketSpace) const
+{
+ return _component.getBucketDatabase(bucketSpace);
+}
+
StorageBucketDBInitializer::Metrics::Metrics(framework::Component& component)
: metrics::MetricSet("dbinit", "",
"Metrics for the storage bucket database initializer"),
@@ -134,7 +141,10 @@ StorageBucketDBInitializer::StorageBucketDBInitializer(
// Initialize read state for disks being available
for (uint32_t i=0; i<_system._partitions.size(); ++i) {
if (!_system._partitions[i].isUp()) continue;
- _readState[i] = BucketReadState::UP(new BucketReadState);
+ _readState[i] = std::make_unique<BucketSpaceReadState>();
+ for (const auto &elem : _system._bucketSpaceRepo) {
+ _readState[i]->insert(std::make_pair(elem.first, std::make_unique<BucketReadState>()));
+ }
_state._dirsToList += 1;
}
_system._component.registerStatusPage(*this);
@@ -155,9 +165,14 @@ StorageBucketDBInitializer::onOpen()
// Trigger bucket database initialization
for (uint32_t i=0; i<_system._partitions.size(); ++i) {
if (!_system._partitions[i].isUp()) continue;
- ReadBucketList::SP msg(new ReadBucketList(BucketSpace::placeHolder(), spi::PartitionId(i)));
- _state._lists[msg->getMsgId()] = msg;
- sendDown(msg);
+ assert(_readState[i].get() != nullptr);
+ const BucketSpaceReadState &spaceState = *_readState[i];
+ for (const auto &stateElem : spaceState) {
+ document::BucketSpace bucketSpace = stateElem.first;
+ ReadBucketList::SP msg(new ReadBucketList(bucketSpace, spi::PartitionId(i)));
+ _state._lists[msg->getMsgId()] = msg;
+ sendDown(msg);
+ }
}
framework::MilliSecTime maxProcessingTime(10);
framework::MilliSecTime sleepTime(1000);
@@ -220,6 +235,26 @@ StorageBucketDBInitializer::print(
out << "StorageBucketDBInitializer()";
}
+namespace {
+
+size_t
+notDoneCount(const StorageBucketDBInitializer::ReadState &readState)
+{
+ size_t result = 0;
+ for (const auto &elem : readState) {
+ if (elem.get() != nullptr) {
+ for (const auto &stateElem : *elem) {
+ if (!stateElem.second->_done) {
+ ++result;
+ }
+ }
+ }
+ }
+ return result;
+}
+
+}
+
void
StorageBucketDBInitializer::reportHtmlStatus(
std::ostream& out, const framework::HttpUrlPath&) const
@@ -261,10 +296,7 @@ StorageBucketDBInitializer::reportHtmlStatus(
out << " " << _state._infoRequests.size()
<< " info requests pending.<br/>\n";
}
- uint32_t incompleteScan = 0;
- for (uint32_t i=0; i<_readState.size(); ++i) {
- if (_readState[i].get() != 0 && !_readState[i]->_done) ++incompleteScan;
- }
+ uint32_t incompleteScan = notDoneCount(_readState);
if (incompleteScan == 0) {
out << " Done iterating bucket database to generate info "
<< "requests.<br/>\n";
@@ -304,29 +336,31 @@ StorageBucketDBInitializer::reportHtmlStatus(
out << " <h3>Disk " << i << " is down</h3>\n";
continue;
}
- BucketReadState& state(*_readState[i]);
- out << " <h3>Disk " << i << "</h3>\n";
- out << " Pending info requests: " << pendingCounts[i] << " (";
- if (state._pending.empty()) {
- out << "none";
- } else {
- bool first = true;
- for (BucketSet::const_iterator it = state._pending.begin();
- it != state._pending.end(); ++it)
- {
- if (!first) {
- out << ", ";
- } else {
- first = false;
+ const BucketSpaceReadState& spaceState(*_readState[i]);
+ for (const auto &stateElem : spaceState) {
+ const BucketReadState &state = *stateElem.second;
+ out << " <h3>Disk " << i << ", bucket space " << stateElem.first.getId() << "</h3>\n";
+ out << " Pending info requests: " << pendingCounts[i] << " (";
+ if (state._pending.empty()) {
+ out << "none";
+ } else {
+ bool first = true;
+ for (BucketSet::const_iterator it = state._pending.begin();
+ it != state._pending.end(); ++it) {
+ if (!first) {
+ out << ", ";
+ } else {
+ first = false;
+ }
+ out << *it;
}
- out << *it;
}
+ out << ")<br/>\n";
+ out << " Bucked database iterator: " << state._databaseIterator
+ << "<br/>\n";
+ out << " Done iterating bucket database. "
+ << (state._done ? "true" : "false") << "<br/>\n";
}
- out << ")<br/>\n";
- out << " Bucked database iterator: " << state._databaseIterator
- << "<br/>\n";
- out << " Done iterating bucket database. "
- << (state._done ? "true" : "false") << "<br/>\n";
}
for (std::map<Disk, uint32_t>::iterator it = pendingCounts.begin();
it != pendingCounts.end(); ++it)
@@ -342,7 +376,7 @@ StorageBucketDBInitializer::registerBucket(const document::Bucket &bucket,
api::BucketInfo bucketInfo)
{
document::BucketId bucketId(bucket.getBucketId());
- StorBucketDatabase::WrappedEntry entry(_system._bucketDatabase.get(
+ StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(bucket.getBucketSpace()).get(
bucketId, "StorageBucketDBInitializer::registerBucket",
StorBucketDatabase::CREATE_IF_NONEXISTING));
if (bucketInfo.valid()) {
@@ -459,9 +493,11 @@ namespace {
// Always called from worker thread. It holds worker monitor.
void
-StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk)
+StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk, document::BucketSpace bucketSpace)
{
- BucketReadState& state(*_readState[disk]);
+ auto itr = _readState[disk]->find(bucketSpace);
+ assert(itr != _readState[disk]->end());
+ BucketReadState& state = *itr->second;
if (state._done
|| state._pending.size() >= _config._maxPendingInfoReadsPerDisk)
{
@@ -473,7 +509,7 @@ StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk)
NextBucketOnDiskFinder finder(disk, state._databaseIterator, count);
LOG(spam, "Iterating bucket db further. Starting at iterator %s",
state._databaseIterator.toString().c_str());
- _system._bucketDatabase.all(finder,
+ _system.getBucketDatabase(bucketSpace).all(finder,
"StorageBucketDBInitializer::readBucketInfo",
state._databaseIterator.stripUnused().toKey());
if (finder._alreadySet > 0) {
@@ -481,7 +517,7 @@ StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk)
_state._infoSetByLoad += finder._alreadySet;
}
for (uint32_t i=0; i<finder._next.size(); ++i) {
- document::Bucket bucket(BucketSpace::placeHolder(), finder._next[i]);
+ document::Bucket bucket(bucketSpace, finder._next[i]);
ReadBucketInfo::SP cmd(new ReadBucketInfo(bucket));
cmd->setPriority(_config._infoReadPriority);
state._pending.insert(finder._next[i]);
@@ -593,7 +629,7 @@ StorageBucketDBInitializer::handleReadBucketListReply(
handleListingCompleted();
}
checkIfDone();
- sendReadBucketInfo(reply.getPartition());
+ sendReadBucketInfo(reply.getPartition(), reply.getBucketSpace());
}
// Always called from worker thread. It holds worker monitor.
@@ -601,12 +637,13 @@ void
StorageBucketDBInitializer::handleReadBucketInfoReply(
ReadBucketInfoReply& reply)
{
+ document::BucketSpace bucketSpace = reply.getBucket().getBucketSpace();
if (reply.getResult().failed()) {
LOGBP(warning, "Deleting %s from bucket database. Cannot use it as we "
"failed to read bucket info for it: %s",
reply.getBucketId().toString().c_str(),
reply.getResult().toString().c_str());
- _system._bucketDatabase.erase(reply.getBucketId(),
+ _system.getBucketDatabase(bucketSpace).erase(reply.getBucketId(),
"dbinit.failedreply");
}
_metrics._infoReadCount.inc();
@@ -622,7 +659,9 @@ StorageBucketDBInitializer::handleReadBucketInfoReply(
} else {
uint32_t disk(it->second);
_state._infoRequests.erase(it->first);
- BucketReadState& state(*_readState[disk]);
+ auto itr = _readState[disk]->find(bucketSpace);
+ assert(itr != _readState[disk]->end());
+ BucketReadState& state = *itr->second;
BucketSet::iterator it2(state._pending.find(reply.getBucketId()));
if (it2 == state._pending.end()) {
LOGBP(warning, "Got bucket info reply for %s that was registered "
@@ -632,12 +671,12 @@ StorageBucketDBInitializer::handleReadBucketInfoReply(
state._pending.erase(reply.getBucketId());
LOG(spam, "Got info reply for %s: %s",
reply.getBucketId().toString().c_str(),
- _system._bucketDatabase.get(
+ _system.getBucketDatabase(reply.getBucket().getBucketSpace()).get(
reply.getBucketId(), "dbinit.inforeply")
->getBucketInfo().toString().c_str());
}
checkIfDone();
- sendReadBucketInfo(spi::PartitionId(disk));
+ sendReadBucketInfo(spi::PartitionId(disk), bucketSpace);
}
}
@@ -661,7 +700,7 @@ StorageBucketDBInitializer::handleInternalBucketJoinReply(
LOG(debug, "Completed internal bucket join for %s. Got bucket info %s",
reply.getBucketId().toString().c_str(),
reply.getBucketInfo().toString().c_str());
- StorBucketDatabase::WrappedEntry entry(_system._bucketDatabase.get(
+ StorBucketDatabase::WrappedEntry entry(_system.getBucketDatabase(reply.getBucket().getBucketSpace()).get(
reply.getBucketId(),
"StorageBucketDBInitializer::onInternalBucketJoinReply"));
entry->setBucketInfo(reply.getBucketInfo());
@@ -674,6 +713,16 @@ StorageBucketDBInitializer::handleInternalBucketJoinReply(
checkIfDone();
}
+namespace {
+
+bool
+isDone(const StorageBucketDBInitializer::ReadState &readState)
+{
+ return notDoneCount(readState) == 0;
+}
+
+}
+
// Always called from worker thread. It holds worker monitor.
void
StorageBucketDBInitializer::checkIfDone()
@@ -681,8 +730,8 @@ StorageBucketDBInitializer::checkIfDone()
if (_state._dirsListed < _state._dirsToList) return;
if (!_state._infoRequests.empty()) return;
if (!_state._joins.empty()) return;
- for (uint32_t i=0; i<_readState.size(); ++i) {
- if (_readState[i].get() != 0 && !_readState[i]->_done) return;
+ if (!isDone(_readState)) {
+ return;
}
_state._doneInitializing = true;
_system._doneInitializeHandler.notifyDoneInitializing();
@@ -698,17 +747,19 @@ StorageBucketDBInitializer::calculateMinProgressFromDiskIterators() const
if (_readState[disk].get() == 0) {
continue;
}
- const BucketReadState& state(*_readState[disk]);
- document::BucketId bid(state._databaseIterator);
+ for (const auto &stateElem : *_readState[disk]) {
+ const BucketReadState &state = *stateElem.second;
+ document::BucketId bid(state._databaseIterator);
- double progress;
- if (!state._done) {
- progress = BucketProgressCalculator::calculateProgress(bid);
- } else {
- progress = 1.0;
- }
+ double progress;
+ if (!state._done) {
+ progress = BucketProgressCalculator::calculateProgress(bid);
+ } else {
+ progress = 1.0;
+ }
- minProgress = std::min(minProgress, progress);
+ minProgress = std::min(minProgress, progress);
+ }
}
//std::cerr << "minProgress: " << minProgress << "\n";
return minProgress;
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
index 99f273a384a..15fbd5172cb 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
+++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
@@ -51,6 +51,7 @@
#include <vespa/vdslib/state/nodestate.h>
#include <vespa/config/subscription/configuri.h>
#include <list>
+#include <unordered_map>
namespace storage {
@@ -77,7 +78,7 @@ class StorageBucketDBInitializer : public StorageLink,
DoneInitializeHandler& _doneInitializeHandler;
ServiceLayerComponent _component;
const spi::PartitionStateList& _partitions;
- StorBucketDatabase& _bucketDatabase;
+ const ContentBucketSpaceRepo& _bucketSpaceRepo;
uint32_t _nodeIndex;
lib::Distribution& _distribution;
lib::NodeState _nodeState; // Disk info for ideal state calculations
@@ -87,6 +88,8 @@ class StorageBucketDBInitializer : public StorageLink,
DoneInitializeHandler& doneInitializeHandler,
ServiceLayerComponentRegister&,
const Config&);
+
+ StorBucketDatabase &getBucketDatabase(document::BucketSpace bucketSpace) const;
};
struct Metrics : public metrics::MetricSet {
metrics::LongCountMetric _wrongDisk;
@@ -126,12 +129,17 @@ class StorageBucketDBInitializer : public StorageLink,
~GlobalState();
};
+public:
+ using BucketSpaceReadState = std::unordered_map<document::BucketSpace,
+ std::unique_ptr<BucketReadState>, document::BucketSpace::hash>;
+ using ReadState = std::vector<std::unique_ptr<BucketSpaceReadState>>;
+private:
Config _config;
System _system;
Metrics _metrics;
GlobalState _state;
- std::vector<std::unique_ptr<BucketReadState>> _readState;
+ ReadState _readState;
public:
StorageBucketDBInitializer(const config::ConfigUri&,
@@ -186,7 +194,7 @@ public:
* Sends more read bucket info to a given disk. Lock must already be taken.
* Will be released by function prior to sending messages down.
*/
- void sendReadBucketInfo(spi::PartitionId);
+ void sendReadBucketInfo(spi::PartitionId, document::BucketSpace bucketSpace);
/** Check whether initialization is complete. Should hold lock to call it.*/
void checkIfDone();