summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-13 15:25:42 +0200
committerGitHub <noreply@github.com>2017-10-13 15:25:42 +0200
commit9fc068e627ae71c5478a91fc06fff4d62933efa1 (patch)
treeff70cfc7c73d3f3b664ac582d32f93a7e2da9ccb /storage
parent117b75ed03ea5d15ecd652b108ba58a6e8e1097a (diff)
parent26d17c63aa981266a36a654d22331be02a53d9dc (diff)
Merge pull request #3742 from vespa-engine/balder/looking-for-missing-join
Balder/looking for missing join
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp227
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h52
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp12
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.h3
7 files changed, 109 insertions, 224 deletions
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 0fd5a30d8bd..6be021a549c 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -98,8 +98,7 @@ PersistenceTestUtils::createPersistenceThread(uint32_t disk)
getEnv()._fileStorHandler,
getEnv()._metrics,
disk,
- 255,
- false));
+ 255));
}
document::Document::SP
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index b46217f6443..ac3d901fd65 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -89,8 +89,7 @@ FileStorManager::~FileStorManager()
}
void
-FileStorManager::print(std::ostream& out, bool verbose,
- const std::string& indent) const
+FileStorManager::print(std::ostream& out, bool verbose, const std::string& indent) const
{
(void) verbose; (void) indent;
out << "FileStorManager";
@@ -128,20 +127,14 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up disk %u", i);
for (uint32_t j = 0; j < 4; j++) {
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, 255, false)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, 255)));
}
for (uint32_t j = 4; j < 6; j++) {
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, 100)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, 100)));
}
}
@@ -149,12 +142,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up disk %u, thread %u with priority %d",
i, j, _config->threads[j].lowestpri);
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, _config->threads[j].lowestpri,
- false)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri)));
}
} else {
@@ -389,11 +378,9 @@ FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd)
}
bool
-FileStorManager::onMultiOperation(
- const std::shared_ptr<api::MultiOperationCommand>& cmd)
+FileStorManager::onMultiOperation(const std::shared_ptr<api::MultiOperationCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -401,11 +388,9 @@ FileStorManager::onMultiOperation(
}
bool
-FileStorManager::onBatchPutRemove(
- const std::shared_ptr<api::BatchPutRemoveCommand>& cmd)
+FileStorManager::onBatchPutRemove(const std::shared_ptr<api::BatchPutRemoveCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -413,11 +398,9 @@ FileStorManager::onBatchPutRemove(
}
bool
-FileStorManager::onRemoveLocation(
- const std::shared_ptr<api::RemoveLocationCommand>& cmd)
+FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -425,11 +408,9 @@ FileStorManager::onRemoveLocation(
}
bool
-FileStorManager::onStatBucket(
- const std::shared_ptr<api::StatBucketCommand>& cmd)
+FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -478,8 +459,7 @@ FileStorManager::onCreateBucket(
}
}
}
- std::shared_ptr<api::CreateBucketReply> reply(
- (api::CreateBucketReply*)cmd->makeReply().release());
+ std::shared_ptr<api::CreateBucketReply> reply((api::CreateBucketReply*)cmd->makeReply().release());
reply->setBucketInfo(api::BucketInfo(0, 0, 0, 0, 0, true, cmd->getActive()));
reply->setResult(code);
sendUp(reply);
@@ -491,12 +471,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
{
uint16_t disk;
{
- StorBucketDatabase::WrappedEntry entry(
- _component.getBucketDatabase().get(
- cmd->getBucketId(), "FileStorManager::onDeleteBucket"));
+ StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(cmd->getBucketId(),
+ "FileStorManager::onDeleteBucket"));
if (!entry.exist()) {
- LOG(debug, "%s was already deleted",
- cmd->getBucketId().toString().c_str());
+ LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str());
std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
sendUp(reply);
return true;
@@ -520,10 +498,8 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str());
std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
- static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(
- entry->getBucketInfo());
- reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
- ost.str()));
+ static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo());
+ reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
entry.unlock();
sendUp(reply);
return true;
@@ -538,13 +514,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
disk = entry->disk;
entry.remove();
}
- _filestorHandler->failOperations(
- cmd->getBucketId(),
- disk,
- api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
- vespalib::make_string(
- "Bucket %s about to be deleted anyway",
- cmd->getBucketId().toString().c_str())));
+ _filestorHandler->failOperations(cmd->getBucketId(), disk,
+ api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
+ vespalib::make_string("Bucket %s about to be deleted anyway",
+ cmd->getBucketId().toString().c_str())));
return true;
}
@@ -564,10 +537,7 @@ FileStorManager::ensureConsistentBucket(
// Don't create empty bucket if merge isn't allowed to continue.
entry.remove();
}
- replyDroppedOperation(msg,
- bucket,
- api::ReturnCode::ABORTED,
- "bucket is inconsistently split");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split");
return StorBucketDatabase::WrappedEntry();
}
@@ -577,10 +547,8 @@ FileStorManager::ensureConsistentBucket(
bool
FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(
- ensureConsistentBucket(cmd->getBucketId(),
- *cmd,
- "FileStorManager::onMergeBucket"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucketId(), *cmd,
+ "FileStorManager::onMergeBucket"));
if (!entry.exist()) {
return true;
}
@@ -589,26 +557,20 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
entry->disk = _component.getIdealPartition(cmd->getBucketId());
if (_partitions[entry->disk].isUp()) {
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due "
- "to merge being received.",
- cmd->getBucketId().toString().c_str(),
- entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s on disk %d (node index is %d) due to merge being received.",
+ cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
// Call before writing bucket entry as we need to have bucket
// lock while calling
handlePersistenceMessage(cmd, entry->disk);
entry.write();
} else {
entry.remove();
- api::ReturnCode code(
- api::ReturnCode::IO_FAILURE,
+ api::ReturnCode code(api::ReturnCode::IO_FAILURE,
vespalib::make_string(
"Trying to perform merge %s whose bucket belongs on target disk %d, which is down. Cluster state version of command is %d, our system state version is %d",
- cmd->toString().c_str(),
- entry->disk,
- cmd->getClusterStateVersion(),
+ cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(),
_component.getStateUpdater().getSystemState()->getVersion()));
- LOGBT(debug, cmd->getBucketId().toString(),
- "%s", code.getMessage().c_str());
+ LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd));
reply->setResult(code);
sendUp(reply);
@@ -668,17 +630,13 @@ FileStorManager::onGetBucketDiff(
}
bool
-FileStorManager::validateApplyDiffCommandBucket(
- api::StorageMessage& msg,
- const StorBucketDatabase::WrappedEntry& entry)
+FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry& entry)
{
if (!entry.exist()) {
return false;
}
if (!_component.getBucketDatabase().isConsistent(entry)) {
- replyDroppedOperation(msg,
- entry.getBucketId(),
- api::ReturnCode::ABORTED,
+ replyDroppedOperation(msg, entry.getBucketId(), api::ReturnCode::ABORTED,
"bucket became inconsistent during merging");
return false;
}
@@ -686,31 +644,26 @@ FileStorManager::validateApplyDiffCommandBucket(
}
bool
-FileStorManager::validateDiffReplyBucket(
- const StorBucketDatabase::WrappedEntry& entry,
- const document::BucketId& bucket)
+FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry,
+ const document::BucketId& bucket)
{
if (!entry.exist()) {
_filestorHandler->clearMergeStatus(bucket,
- api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
- "Bucket removed during merge"));
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, "Bucket removed during merge"));
return false;
}
if (!_component.getBucketDatabase().isConsistent(entry)) {
_filestorHandler->clearMergeStatus(bucket,
- api::ReturnCode(api::ReturnCode::ABORTED,
- "Bucket became inconsistent during merging"));
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket became inconsistent during merging"));
return false;
}
return true;
}
bool
-FileStorManager::onGetBucketDiffReply(
- const shared_ptr<api::GetBucketDiffReply>& reply)
+FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *reply, reply->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId()));
if (validateDiffReplyBucket(entry, reply->getBucketId())) {
handlePersistenceMessage(reply, entry->disk);
}
@@ -718,11 +671,9 @@ FileStorManager::onGetBucketDiffReply(
}
bool
-FileStorManager::onApplyBucketDiff(
- const shared_ptr<api::ApplyBucketDiffCommand>& cmd)
+FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (validateApplyDiffCommandBucket(*cmd, entry)) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -730,8 +681,7 @@ FileStorManager::onApplyBucketDiff(
}
bool
-FileStorManager::onApplyBucketDiffReply(
- const shared_ptr<api::ApplyBucketDiffReply>& reply)
+FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
*reply, reply->getBucketId()));
@@ -742,8 +692,7 @@ FileStorManager::onApplyBucketDiffReply(
}
bool
-FileStorManager::onJoinBuckets(
- const std::shared_ptr<api::JoinBucketsCommand>& cmd)
+FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(
cmd->getBucketId(), "FileStorManager::onJoinBuckets"));
@@ -757,11 +706,9 @@ FileStorManager::onJoinBuckets(
}
bool
-FileStorManager::onSplitBucket(
- const std::shared_ptr<api::SplitBucketCommand>& cmd)
+FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -772,8 +719,7 @@ bool
FileStorManager::onSetBucketState(
const std::shared_ptr<api::SetBucketStateCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -786,10 +732,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
switch (msg->getType()) {
case GetIterCommand::ID:
{
- shared_ptr<GetIterCommand> cmd(
- std::static_pointer_cast<GetIterCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -797,10 +741,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case CreateIteratorCommand::ID:
{
- shared_ptr<CreateIteratorCommand> cmd(
- std::static_pointer_cast<CreateIteratorCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -808,28 +750,22 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case DestroyIteratorCommand::ID:
{
- spi::Context context(msg->getLoadType(), msg->getPriority(),
- msg->getTrace().getLevel());
- shared_ptr<DestroyIteratorCommand> cmd(
- std::static_pointer_cast<DestroyIteratorCommand>(msg));
+ spi::Context context(msg->getLoadType(), msg->getPriority(), msg->getTrace().getLevel());
+ shared_ptr<DestroyIteratorCommand> cmd(std::static_pointer_cast<DestroyIteratorCommand>(msg));
_provider->destroyIterator(cmd->getIteratorId(), context);
msg->getTrace().getRoot().addChild(context.getTrace().getRoot());
return true;
}
case ReadBucketList::ID:
{
- shared_ptr<ReadBucketList> cmd(
- std::static_pointer_cast<ReadBucketList>(msg));
-
+ shared_ptr<ReadBucketList> cmd(std::static_pointer_cast<ReadBucketList>(msg));
handlePersistenceMessage(cmd, cmd->getPartition());
return true;
}
case ReadBucketInfo::ID:
{
- shared_ptr<ReadBucketInfo> cmd(
- std::static_pointer_cast<ReadBucketInfo>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -837,10 +773,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case InternalBucketJoinCommand::ID:
{
- shared_ptr<InternalBucketJoinCommand> cmd(
- std::static_pointer_cast<InternalBucketJoinCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -848,10 +782,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case RepairBucketCommand::ID:
{
- shared_ptr<RepairBucketCommand> cmd(
- std::static_pointer_cast<RepairBucketCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<RepairBucketCommand> cmd(std::static_pointer_cast<RepairBucketCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -859,10 +791,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case BucketDiskMoveCommand::ID:
{
- shared_ptr<BucketDiskMoveCommand> cmd(
- std::static_pointer_cast<BucketDiskMoveCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<BucketDiskMoveCommand> cmd(std::static_pointer_cast<BucketDiskMoveCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -870,10 +800,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case RecheckBucketInfoCommand::ID:
{
- shared_ptr<RecheckBucketInfoCommand> cmd(
- std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -881,8 +809,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case AbortBucketOperationsCommand::ID:
{
- shared_ptr<AbortBucketOperationsCommand> cmd(
- std::static_pointer_cast<AbortBucketOperationsCommand>(msg));
+ shared_ptr<AbortBucketOperationsCommand> cmd(std::static_pointer_cast<AbortBucketOperationsCommand>(msg));
handleAbortBucketOperations(cmd);
return true;
}
@@ -892,8 +819,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
void
-FileStorManager::handleAbortBucketOperations(
- const shared_ptr<AbortBucketOperationsCommand>& cmd)
+FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd)
{
_filestorHandler->abortQueuedOperations(*cmd);
sendReply(api::StorageReply::SP(cmd->makeReply().release()));
@@ -925,8 +851,7 @@ FileStorManager::sendReply(const std::shared_ptr<api::StorageReply>& reply)
LOG(spam, "Sending reply %s", reply->toString().c_str());
if (reply->getType() == api::MessageType::INTERNAL_REPLY) {
- std::shared_ptr<api::InternalReply> rep(
- std::dynamic_pointer_cast<api::InternalReply>(reply));
+ std::shared_ptr<api::InternalReply> rep(std::dynamic_pointer_cast<api::InternalReply>(reply));
assert(rep.get());
if (onInternalReply(rep)) return;
}
@@ -1021,8 +946,7 @@ FileStorManager::isMerging(const document::BucketId& bucket) const
namespace {
struct Deactivator {
- StorBucketDatabase::Decision operator()(
- document::BucketId::Type, StorBucketDatabase::Entry& data)
+ StorBucketDatabase::Decision operator()(document::BucketId::Type, StorBucketDatabase::Entry& data)
{
data.info.setActive(false);
return StorBucketDatabase::UPDATE;
@@ -1034,21 +958,16 @@ void
FileStorManager::updateState()
{
lib::ClusterState::CSP state(_component.getStateUpdater().getSystemState());
- spi::ClusterState spiState(
- *state, _component.getIndex(), *_component.getDistribution());
+ spi::ClusterState spiState(*state, _component.getIndex(), *_component.getDistribution());
lib::Node node(_component.getNodeType(), _component.getIndex());
bool nodeUp = state->getNodeState(node).getState().oneOf("uir");
- LOG(debug, "FileStorManager received cluster state '%s'",
- state->toString().c_str());
+ LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str());
// If edge where we go down
if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) {
- LOG(debug,
- "Received cluster state where this node is down; "
- "de-activating all buckets in database");
+ LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database");
Deactivator deactivator;
- _component.getBucketDatabase().all(
- deactivator, "FileStorManager::updateState");
+ _component.getBucketDatabase().all(deactivator, "FileStorManager::updateState");
}
_provider->setClusterState(spiState);
_nodeUpInLastNodeStateSeenByProvider = nodeUp;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 0ab512cd63f..11f8b5d1cf4 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -5,7 +5,6 @@
#include "bucketownershipnotifier.h"
#include "testandsethelper.h"
#include <vespa/storageapi/message/multioperation.h>
-#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
@@ -24,15 +23,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
uint16_t deviceIndex,
- uint8_t lowestPriority,
- bool startThread)
- : _env(configUri,
- compReg,
- filestorHandler,
- metrics,
- deviceIndex,
- lowestPriority,
- provider),
+ uint8_t lowestPriority)
+ : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider),
_warnOnSlowOperations(5000),
_spi(provider),
_processAllHandler(_env, provider),
@@ -43,13 +35,10 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
_flushMonitor(),
_closed(false)
{
- (void) startThread;
std::ostringstream threadName;
- threadName << "Disk " << _env._partition << " thread "
- << (void*) this;
+ threadName << "Disk " << _env._partition << " thread " << (void*) this;
_component.reset(new ServiceLayerComponent(compReg, threadName.str()));
- _bucketOwnershipNotifier.reset(
- new BucketOwnershipNotifier(*_component, filestorHandler));
+ _bucketOwnershipNotifier.reset(new BucketOwnershipNotifier(*_component, filestorHandler));
framework::MilliSecTime maxProcessingTime(60 * 1000);
framework::MilliSecTime waitTime(1000);
_thread = _component->startThread(*this, maxProcessingTime, waitTime);
@@ -57,8 +46,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
PersistenceThread::~PersistenceThread()
{
- LOG(debug, "Shutting down persistence thread. Waiting for current "
- "operation to finish.");
+ LOG(debug, "Shutting down persistence thread. Waiting for current operation to finish.");
_thread->interrupt();
LOG(debug, "Waiting for thread to terminate.");
_thread->join();
@@ -66,8 +54,7 @@ PersistenceThread::~PersistenceThread()
}
spi::Bucket
-PersistenceThread::getBucket(const DocumentId& id,
- const BucketId& bucket) const
+PersistenceThread::getBucket(const DocumentId& id, const BucketId& bucket) const
{
BucketId docBucket(_env._bucketFactory.getBucketId(id));
docBucket.setUsedBits(bucket.getUsedBits());
@@ -82,8 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id,
}
bool
-PersistenceThread::checkForError(const spi::Result& response,
- MessageTracker& tracker)
+PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker)
{
uint32_t code = _env.convertErrorCode(response);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 218d2f7dd23..4d714dc878a 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -16,31 +16,22 @@ namespace storage {
class BucketOwnershipNotifier;
class TestAndSetHelper;
-class PersistenceThread : public DiskThread, public Types
+class PersistenceThread final : public DiskThread, public Types
{
public:
- PersistenceThread(ServiceLayerComponentRegister&,
- const config::ConfigUri & configUri,
- spi::PersistenceProvider& provider,
- FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority,
- bool startThread = false);
+ PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
+ spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
+ FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority);
~PersistenceThread();
/** Waits for current operation to be finished. */
void flush() override;
-
- bool isMerging(const BucketId& bucket) const;
-
framework::Thread& getThread() override { return *_thread; }
MessageTracker::UP handlePut(api::PutCommand& cmd);
MessageTracker::UP handleRemove(api::RemoveCommand& cmd);
MessageTracker::UP handleUpdate(api::UpdateCommand& cmd);
MessageTracker::UP handleGet(api::GetCommand& cmd);
-
MessageTracker::UP handleMultiOperation(api::MultiOperationCommand& cmd);
MessageTracker::UP handleRevert(api::RevertCommand& cmd);
MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd);
@@ -57,37 +48,31 @@ public:
MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd);
private:
- PersistenceUtil _env;
- uint32_t _warnOnSlowOperations;
-
+ PersistenceUtil _env;
+ uint32_t _warnOnSlowOperations;
spi::PersistenceProvider& _spi;
- ProcessAllHandler _processAllHandler;
- MergeHandler _mergeHandler;
- DiskMoveOperationHandler _diskMoveHandler;
+ ProcessAllHandler _processAllHandler;
+ MergeHandler _mergeHandler;
+ DiskMoveOperationHandler _diskMoveHandler;
ServiceLayerComponent::UP _component;
- framework::Thread::UP _thread;
- spi::Context _context;
+ framework::Thread::UP _thread;
+ spi::Context _context;
std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
+ vespalib::Monitor _flushMonitor;
+ bool _closed;
- vespalib::Monitor _flushMonitor;
- bool _closed;
-
- void setBucketInfo(MessageTracker& tracker,
- const document::BucketId& bucketId);
+ void setBucketInfo(MessageTracker& tracker, const document::BucketId& bucketId);
- bool checkProviderBucketInfoMatches(const spi::Bucket&,
- const api::BucketInfo&) const;
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- void updateBucketDatabase(const document::BucketId& id,
- const api::BucketInfo& info);
+ void updateBucketDatabase(const document::BucketId& id, const api::BucketInfo& info);
/**
* Sanity-checking of join command parameters. Invokes tracker.fail() with
* an appropriate error and returns false iff the command does not validate
* OK. Returns true and does not touch the tracker otherwise.
*/
- bool validateJoinCommand(const api::JoinBucketsCommand& cmd,
- MessageTracker& tracker) const;
+ bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const;
// Message handling functions
MessageTracker::UP handleCommand(api::StorageCommand&);
@@ -102,8 +87,7 @@ private:
bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const BucketId& bucket) const;
- void flushAllReplies(const document::BucketId& bucketId,
- std::vector<MessageTracker::UP>& trackers);
+ void flushAllReplies(const document::BucketId& bucketId, std::vector<MessageTracker::UP>& trackers);
friend class TestAndSetHelper;
bool tasConditionExists(const api::TestAndSetCommand & cmd);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index e2861ef42cd..9faf380d2ba 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -28,13 +28,11 @@ using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBu
namespace storage {
-ServiceLayerNode::ServiceLayerNode(
- const config::ConfigUri & configUri,
- ServiceLayerNodeContext& context,
- ApplicationGenerationFetcher& generationFetcher,
- spi::PersistenceProvider& persistenceProvider,
- const VisitorFactory::Map& externalVisitors)
- : StorageNode(configUri, context, generationFetcher, std::unique_ptr<HostInfo>(new HostInfo)),
+ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context,
+ ApplicationGenerationFetcher& generationFetcher,
+ spi::PersistenceProvider& persistenceProvider,
+ const VisitorFactory::Map& externalVisitors)
+ : StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>()),
_context(context),
_persistenceProvider(persistenceProvider),
_partitions(0),
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
index 5f4ceff2a6b..2e1aa52e68d 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
@@ -4,16 +4,16 @@
#include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h>
-using storage::framework::defaultimplementation::AllocationLogic;
-
namespace storage {
+using framework::defaultimplementation::AllocationLogic;
+using framework::defaultimplementation::PriorityMemoryLogic;
+
StorageNodeContext::StorageNodeContext(ComponentRegister::UP compReg, framework::Clock::UP clock)
: _componentRegister(std::move(compReg)),
_clock(std::move(clock)),
_threadPool(*_clock),
- _memoryLogic(new framework::defaultimplementation::PriorityMemoryLogic(
- *_clock, 1024 * 1024 * 1024)),
+ _memoryLogic(new PriorityMemoryLogic(*_clock, 1024 * 1024 * 1024)),
_memoryManager(AllocationLogic::UP(_memoryLogic))
{
_componentRegister->setClock(*_clock);
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h
index 7d6f3b0aef5..0149f975f63 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.h
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h
@@ -53,8 +53,7 @@ struct StorageNodeContext {
protected:
// Initialization has been split in two as subclass needs to initialize
// component register before sending it on.
- StorageNodeContext(ComponentRegister::UP,
- framework::Clock::UP);
+ StorageNodeContext(ComponentRegister::UP, framework::Clock::UP);
private:
ComponentRegister::UP _componentRegister;