summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-13 11:32:52 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-13 11:51:00 +0200
commitac1d8b274911cf1f71e751646ca6aed260213934 (patch)
tree6066792bd6244e8393a4fbc700d9c4f99596372b
parent025b759b5cf0146cd0cf3f44f686eeace1b28d09 (diff)
Cleanup while looking for missing join.
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp227
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h18
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp12
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.h3
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp48
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h10
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.cpp22
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.h2
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp4
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.h5
12 files changed, 132 insertions, 255 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index b46217f6443..ac3d901fd65 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -89,8 +89,7 @@ FileStorManager::~FileStorManager()
}
void
-FileStorManager::print(std::ostream& out, bool verbose,
- const std::string& indent) const
+FileStorManager::print(std::ostream& out, bool verbose, const std::string& indent) const
{
(void) verbose; (void) indent;
out << "FileStorManager";
@@ -128,20 +127,14 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up disk %u", i);
for (uint32_t j = 0; j < 4; j++) {
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, 255, false)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, 255)));
}
for (uint32_t j = 4; j < 6; j++) {
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, 100)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, 100)));
}
}
@@ -149,12 +142,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up disk %u, thread %u with priority %d",
i, j, _config->threads[j].lowestpri);
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, _config->threads[j].lowestpri,
- false)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri)));
}
} else {
@@ -389,11 +378,9 @@ FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd)
}
bool
-FileStorManager::onMultiOperation(
- const std::shared_ptr<api::MultiOperationCommand>& cmd)
+FileStorManager::onMultiOperation(const std::shared_ptr<api::MultiOperationCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -401,11 +388,9 @@ FileStorManager::onMultiOperation(
}
bool
-FileStorManager::onBatchPutRemove(
- const std::shared_ptr<api::BatchPutRemoveCommand>& cmd)
+FileStorManager::onBatchPutRemove(const std::shared_ptr<api::BatchPutRemoveCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -413,11 +398,9 @@ FileStorManager::onBatchPutRemove(
}
bool
-FileStorManager::onRemoveLocation(
- const std::shared_ptr<api::RemoveLocationCommand>& cmd)
+FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -425,11 +408,9 @@ FileStorManager::onRemoveLocation(
}
bool
-FileStorManager::onStatBucket(
- const std::shared_ptr<api::StatBucketCommand>& cmd)
+FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -478,8 +459,7 @@ FileStorManager::onCreateBucket(
}
}
}
- std::shared_ptr<api::CreateBucketReply> reply(
- (api::CreateBucketReply*)cmd->makeReply().release());
+ std::shared_ptr<api::CreateBucketReply> reply((api::CreateBucketReply*)cmd->makeReply().release());
reply->setBucketInfo(api::BucketInfo(0, 0, 0, 0, 0, true, cmd->getActive()));
reply->setResult(code);
sendUp(reply);
@@ -491,12 +471,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
{
uint16_t disk;
{
- StorBucketDatabase::WrappedEntry entry(
- _component.getBucketDatabase().get(
- cmd->getBucketId(), "FileStorManager::onDeleteBucket"));
+ StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(cmd->getBucketId(),
+ "FileStorManager::onDeleteBucket"));
if (!entry.exist()) {
- LOG(debug, "%s was already deleted",
- cmd->getBucketId().toString().c_str());
+ LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str());
std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
sendUp(reply);
return true;
@@ -520,10 +498,8 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str());
std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
- static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(
- entry->getBucketInfo());
- reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
- ost.str()));
+ static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo());
+ reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
entry.unlock();
sendUp(reply);
return true;
@@ -538,13 +514,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
disk = entry->disk;
entry.remove();
}
- _filestorHandler->failOperations(
- cmd->getBucketId(),
- disk,
- api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
- vespalib::make_string(
- "Bucket %s about to be deleted anyway",
- cmd->getBucketId().toString().c_str())));
+ _filestorHandler->failOperations(cmd->getBucketId(), disk,
+ api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
+ vespalib::make_string("Bucket %s about to be deleted anyway",
+ cmd->getBucketId().toString().c_str())));
return true;
}
@@ -564,10 +537,7 @@ FileStorManager::ensureConsistentBucket(
// Don't create empty bucket if merge isn't allowed to continue.
entry.remove();
}
- replyDroppedOperation(msg,
- bucket,
- api::ReturnCode::ABORTED,
- "bucket is inconsistently split");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split");
return StorBucketDatabase::WrappedEntry();
}
@@ -577,10 +547,8 @@ FileStorManager::ensureConsistentBucket(
bool
FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(
- ensureConsistentBucket(cmd->getBucketId(),
- *cmd,
- "FileStorManager::onMergeBucket"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucketId(), *cmd,
+ "FileStorManager::onMergeBucket"));
if (!entry.exist()) {
return true;
}
@@ -589,26 +557,20 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
entry->disk = _component.getIdealPartition(cmd->getBucketId());
if (_partitions[entry->disk].isUp()) {
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due "
- "to merge being received.",
- cmd->getBucketId().toString().c_str(),
- entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s on disk %d (node index is %d) due to merge being received.",
+ cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
// Call before writing bucket entry as we need to have bucket
// lock while calling
handlePersistenceMessage(cmd, entry->disk);
entry.write();
} else {
entry.remove();
- api::ReturnCode code(
- api::ReturnCode::IO_FAILURE,
+ api::ReturnCode code(api::ReturnCode::IO_FAILURE,
vespalib::make_string(
"Trying to perform merge %s whose bucket belongs on target disk %d, which is down. Cluster state version of command is %d, our system state version is %d",
- cmd->toString().c_str(),
- entry->disk,
- cmd->getClusterStateVersion(),
+ cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(),
_component.getStateUpdater().getSystemState()->getVersion()));
- LOGBT(debug, cmd->getBucketId().toString(),
- "%s", code.getMessage().c_str());
+ LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd));
reply->setResult(code);
sendUp(reply);
@@ -668,17 +630,13 @@ FileStorManager::onGetBucketDiff(
}
bool
-FileStorManager::validateApplyDiffCommandBucket(
- api::StorageMessage& msg,
- const StorBucketDatabase::WrappedEntry& entry)
+FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry& entry)
{
if (!entry.exist()) {
return false;
}
if (!_component.getBucketDatabase().isConsistent(entry)) {
- replyDroppedOperation(msg,
- entry.getBucketId(),
- api::ReturnCode::ABORTED,
+ replyDroppedOperation(msg, entry.getBucketId(), api::ReturnCode::ABORTED,
"bucket became inconsistent during merging");
return false;
}
@@ -686,31 +644,26 @@ FileStorManager::validateApplyDiffCommandBucket(
}
bool
-FileStorManager::validateDiffReplyBucket(
- const StorBucketDatabase::WrappedEntry& entry,
- const document::BucketId& bucket)
+FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry,
+ const document::BucketId& bucket)
{
if (!entry.exist()) {
_filestorHandler->clearMergeStatus(bucket,
- api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
- "Bucket removed during merge"));
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, "Bucket removed during merge"));
return false;
}
if (!_component.getBucketDatabase().isConsistent(entry)) {
_filestorHandler->clearMergeStatus(bucket,
- api::ReturnCode(api::ReturnCode::ABORTED,
- "Bucket became inconsistent during merging"));
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket became inconsistent during merging"));
return false;
}
return true;
}
bool
-FileStorManager::onGetBucketDiffReply(
- const shared_ptr<api::GetBucketDiffReply>& reply)
+FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *reply, reply->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId()));
if (validateDiffReplyBucket(entry, reply->getBucketId())) {
handlePersistenceMessage(reply, entry->disk);
}
@@ -718,11 +671,9 @@ FileStorManager::onGetBucketDiffReply(
}
bool
-FileStorManager::onApplyBucketDiff(
- const shared_ptr<api::ApplyBucketDiffCommand>& cmd)
+FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (validateApplyDiffCommandBucket(*cmd, entry)) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -730,8 +681,7 @@ FileStorManager::onApplyBucketDiff(
}
bool
-FileStorManager::onApplyBucketDiffReply(
- const shared_ptr<api::ApplyBucketDiffReply>& reply)
+FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
*reply, reply->getBucketId()));
@@ -742,8 +692,7 @@ FileStorManager::onApplyBucketDiffReply(
}
bool
-FileStorManager::onJoinBuckets(
- const std::shared_ptr<api::JoinBucketsCommand>& cmd)
+FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(
cmd->getBucketId(), "FileStorManager::onJoinBuckets"));
@@ -757,11 +706,9 @@ FileStorManager::onJoinBuckets(
}
bool
-FileStorManager::onSplitBucket(
- const std::shared_ptr<api::SplitBucketCommand>& cmd)
+FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -772,8 +719,7 @@ bool
FileStorManager::onSetBucketState(
const std::shared_ptr<api::SetBucketStateCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -786,10 +732,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
switch (msg->getType()) {
case GetIterCommand::ID:
{
- shared_ptr<GetIterCommand> cmd(
- std::static_pointer_cast<GetIterCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -797,10 +741,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case CreateIteratorCommand::ID:
{
- shared_ptr<CreateIteratorCommand> cmd(
- std::static_pointer_cast<CreateIteratorCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -808,28 +750,22 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case DestroyIteratorCommand::ID:
{
- spi::Context context(msg->getLoadType(), msg->getPriority(),
- msg->getTrace().getLevel());
- shared_ptr<DestroyIteratorCommand> cmd(
- std::static_pointer_cast<DestroyIteratorCommand>(msg));
+ spi::Context context(msg->getLoadType(), msg->getPriority(), msg->getTrace().getLevel());
+ shared_ptr<DestroyIteratorCommand> cmd(std::static_pointer_cast<DestroyIteratorCommand>(msg));
_provider->destroyIterator(cmd->getIteratorId(), context);
msg->getTrace().getRoot().addChild(context.getTrace().getRoot());
return true;
}
case ReadBucketList::ID:
{
- shared_ptr<ReadBucketList> cmd(
- std::static_pointer_cast<ReadBucketList>(msg));
-
+ shared_ptr<ReadBucketList> cmd(std::static_pointer_cast<ReadBucketList>(msg));
handlePersistenceMessage(cmd, cmd->getPartition());
return true;
}
case ReadBucketInfo::ID:
{
- shared_ptr<ReadBucketInfo> cmd(
- std::static_pointer_cast<ReadBucketInfo>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -837,10 +773,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case InternalBucketJoinCommand::ID:
{
- shared_ptr<InternalBucketJoinCommand> cmd(
- std::static_pointer_cast<InternalBucketJoinCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -848,10 +782,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case RepairBucketCommand::ID:
{
- shared_ptr<RepairBucketCommand> cmd(
- std::static_pointer_cast<RepairBucketCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<RepairBucketCommand> cmd(std::static_pointer_cast<RepairBucketCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -859,10 +791,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case BucketDiskMoveCommand::ID:
{
- shared_ptr<BucketDiskMoveCommand> cmd(
- std::static_pointer_cast<BucketDiskMoveCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<BucketDiskMoveCommand> cmd(std::static_pointer_cast<BucketDiskMoveCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -870,10 +800,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case RecheckBucketInfoCommand::ID:
{
- shared_ptr<RecheckBucketInfoCommand> cmd(
- std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -881,8 +809,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case AbortBucketOperationsCommand::ID:
{
- shared_ptr<AbortBucketOperationsCommand> cmd(
- std::static_pointer_cast<AbortBucketOperationsCommand>(msg));
+ shared_ptr<AbortBucketOperationsCommand> cmd(std::static_pointer_cast<AbortBucketOperationsCommand>(msg));
handleAbortBucketOperations(cmd);
return true;
}
@@ -892,8 +819,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
void
-FileStorManager::handleAbortBucketOperations(
- const shared_ptr<AbortBucketOperationsCommand>& cmd)
+FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd)
{
_filestorHandler->abortQueuedOperations(*cmd);
sendReply(api::StorageReply::SP(cmd->makeReply().release()));
@@ -925,8 +851,7 @@ FileStorManager::sendReply(const std::shared_ptr<api::StorageReply>& reply)
LOG(spam, "Sending reply %s", reply->toString().c_str());
if (reply->getType() == api::MessageType::INTERNAL_REPLY) {
- std::shared_ptr<api::InternalReply> rep(
- std::dynamic_pointer_cast<api::InternalReply>(reply));
+ std::shared_ptr<api::InternalReply> rep(std::dynamic_pointer_cast<api::InternalReply>(reply));
assert(rep.get());
if (onInternalReply(rep)) return;
}
@@ -1021,8 +946,7 @@ FileStorManager::isMerging(const document::BucketId& bucket) const
namespace {
struct Deactivator {
- StorBucketDatabase::Decision operator()(
- document::BucketId::Type, StorBucketDatabase::Entry& data)
+ StorBucketDatabase::Decision operator()(document::BucketId::Type, StorBucketDatabase::Entry& data)
{
data.info.setActive(false);
return StorBucketDatabase::UPDATE;
@@ -1034,21 +958,16 @@ void
FileStorManager::updateState()
{
lib::ClusterState::CSP state(_component.getStateUpdater().getSystemState());
- spi::ClusterState spiState(
- *state, _component.getIndex(), *_component.getDistribution());
+ spi::ClusterState spiState(*state, _component.getIndex(), *_component.getDistribution());
lib::Node node(_component.getNodeType(), _component.getIndex());
bool nodeUp = state->getNodeState(node).getState().oneOf("uir");
- LOG(debug, "FileStorManager received cluster state '%s'",
- state->toString().c_str());
+ LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str());
// If edge where we go down
if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) {
- LOG(debug,
- "Received cluster state where this node is down; "
- "de-activating all buckets in database");
+ LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database");
Deactivator deactivator;
- _component.getBucketDatabase().all(
- deactivator, "FileStorManager::updateState");
+ _component.getBucketDatabase().all(deactivator, "FileStorManager::updateState");
}
_provider->setClusterState(spiState);
_nodeUpInLastNodeStateSeenByProvider = nodeUp;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 0ab512cd63f..11f8b5d1cf4 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -5,7 +5,6 @@
#include "bucketownershipnotifier.h"
#include "testandsethelper.h"
#include <vespa/storageapi/message/multioperation.h>
-#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
@@ -24,15 +23,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
uint16_t deviceIndex,
- uint8_t lowestPriority,
- bool startThread)
- : _env(configUri,
- compReg,
- filestorHandler,
- metrics,
- deviceIndex,
- lowestPriority,
- provider),
+ uint8_t lowestPriority)
+ : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider),
_warnOnSlowOperations(5000),
_spi(provider),
_processAllHandler(_env, provider),
@@ -43,13 +35,10 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
_flushMonitor(),
_closed(false)
{
- (void) startThread;
std::ostringstream threadName;
- threadName << "Disk " << _env._partition << " thread "
- << (void*) this;
+ threadName << "Disk " << _env._partition << " thread " << (void*) this;
_component.reset(new ServiceLayerComponent(compReg, threadName.str()));
- _bucketOwnershipNotifier.reset(
- new BucketOwnershipNotifier(*_component, filestorHandler));
+ _bucketOwnershipNotifier.reset(new BucketOwnershipNotifier(*_component, filestorHandler));
framework::MilliSecTime maxProcessingTime(60 * 1000);
framework::MilliSecTime waitTime(1000);
_thread = _component->startThread(*this, maxProcessingTime, waitTime);
@@ -57,8 +46,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
PersistenceThread::~PersistenceThread()
{
- LOG(debug, "Shutting down persistence thread. Waiting for current "
- "operation to finish.");
+ LOG(debug, "Shutting down persistence thread. Waiting for current operation to finish.");
_thread->interrupt();
LOG(debug, "Waiting for thread to terminate.");
_thread->join();
@@ -66,8 +54,7 @@ PersistenceThread::~PersistenceThread()
}
spi::Bucket
-PersistenceThread::getBucket(const DocumentId& id,
- const BucketId& bucket) const
+PersistenceThread::getBucket(const DocumentId& id, const BucketId& bucket) const
{
BucketId docBucket(_env._bucketFactory.getBucketId(id));
docBucket.setUsedBits(bucket.getUsedBits());
@@ -82,8 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id,
}
bool
-PersistenceThread::checkForError(const spi::Result& response,
- MessageTracker& tracker)
+PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker)
{
uint32_t code = _env.convertErrorCode(response);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 218d2f7dd23..7719f796e3d 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -25,8 +25,7 @@ public:
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
uint16_t deviceIndex,
- uint8_t lowestPriority,
- bool startThread = false);
+ uint8_t lowestPriority);
~PersistenceThread();
/** Waits for current operation to be finished. */
@@ -72,22 +71,18 @@ private:
vespalib::Monitor _flushMonitor;
bool _closed;
- void setBucketInfo(MessageTracker& tracker,
- const document::BucketId& bucketId);
+ void setBucketInfo(MessageTracker& tracker, const document::BucketId& bucketId);
- bool checkProviderBucketInfoMatches(const spi::Bucket&,
- const api::BucketInfo&) const;
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- void updateBucketDatabase(const document::BucketId& id,
- const api::BucketInfo& info);
+ void updateBucketDatabase(const document::BucketId& id, const api::BucketInfo& info);
/**
* Sanity-checking of join command parameters. Invokes tracker.fail() with
* an appropriate error and returns false iff the command does not validate
* OK. Returns true and does not touch the tracker otherwise.
*/
- bool validateJoinCommand(const api::JoinBucketsCommand& cmd,
- MessageTracker& tracker) const;
+ bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const;
// Message handling functions
MessageTracker::UP handleCommand(api::StorageCommand&);
@@ -102,8 +97,7 @@ private:
bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const BucketId& bucket) const;
- void flushAllReplies(const document::BucketId& bucketId,
- std::vector<MessageTracker::UP>& trackers);
+ void flushAllReplies(const document::BucketId& bucketId, std::vector<MessageTracker::UP>& trackers);
friend class TestAndSetHelper;
bool tasConditionExists(const api::TestAndSetCommand & cmd);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index e2861ef42cd..9faf380d2ba 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -28,13 +28,11 @@ using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBu
namespace storage {
-ServiceLayerNode::ServiceLayerNode(
- const config::ConfigUri & configUri,
- ServiceLayerNodeContext& context,
- ApplicationGenerationFetcher& generationFetcher,
- spi::PersistenceProvider& persistenceProvider,
- const VisitorFactory::Map& externalVisitors)
- : StorageNode(configUri, context, generationFetcher, std::unique_ptr<HostInfo>(new HostInfo)),
+ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context,
+ ApplicationGenerationFetcher& generationFetcher,
+ spi::PersistenceProvider& persistenceProvider,
+ const VisitorFactory::Map& externalVisitors)
+ : StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>()),
_context(context),
_persistenceProvider(persistenceProvider),
_partitions(0),
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
index 5f4ceff2a6b..2e1aa52e68d 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
@@ -4,16 +4,16 @@
#include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h>
-using storage::framework::defaultimplementation::AllocationLogic;
-
namespace storage {
+using framework::defaultimplementation::AllocationLogic;
+using framework::defaultimplementation::PriorityMemoryLogic;
+
StorageNodeContext::StorageNodeContext(ComponentRegister::UP compReg, framework::Clock::UP clock)
: _componentRegister(std::move(compReg)),
_clock(std::move(clock)),
_threadPool(*_clock),
- _memoryLogic(new framework::defaultimplementation::PriorityMemoryLogic(
- *_clock, 1024 * 1024 * 1024)),
+ _memoryLogic(new PriorityMemoryLogic(*_clock, 1024 * 1024 * 1024)),
_memoryManager(AllocationLogic::UP(_memoryLogic))
{
_componentRegister->setClock(*_clock);
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h
index 7d6f3b0aef5..0149f975f63 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.h
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h
@@ -53,8 +53,7 @@ struct StorageNodeContext {
protected:
// Initialization has been split in two as subclass needs to initialize
// component register before sending it on.
- StorageNodeContext(ComponentRegister::UP,
- framework::Clock::UP);
+ StorageNodeContext(ComponentRegister::UP, framework::Clock::UP);
private:
ComponentRegister::UP _componentRegister;
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
index eae02c71cfb..09c805b2b85 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
@@ -3,12 +3,12 @@
#include "threadpoolimpl.h"
#include "threadimpl.h"
#include <vespa/vespalib/util/exceptions.h>
+#include <thread>
+using namespace std::chrono_literals;
using vespalib::IllegalStateException;
-namespace storage {
-namespace framework {
-namespace defaultimplementation {
+namespace storage::framework::defaultimplementation {
ThreadPoolImpl::ThreadPoolImpl(Clock& clock)
: _backendThreadPool(512 * 1024),
@@ -21,11 +21,11 @@ ThreadPoolImpl::~ThreadPoolImpl()
{
vespalib::LockGuard lock(_threadVectorLock);
_stopping = true;
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- _threads[i]->interrupt();
+ for (ThreadImpl * thread : _threads) {
+ thread->interrupt();
}
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- _threads[i]->join();
+ for (ThreadImpl * thread : _threads) {
+ thread->join();
}
}
for (uint32_t i=0; true; i+=10) {
@@ -34,30 +34,25 @@ ThreadPoolImpl::~ThreadPoolImpl()
if (_threads.empty()) break;
}
if (i > 1000) {
- fprintf(stderr, "Failed to kill thread pool. Threads won't die. (And "
- "if allowing thread pool object to be deleted this "
- "will create a segfault later)\n");
+ fprintf(stderr, "Failed to kill thread pool. Threads won't die. (And if allowing thread pool object"
+ " to be deleted this will create a segfault later)\n");
abort();
}
- FastOS_Thread::Sleep(10);
+ std::this_thread::sleep_for(10ms);
}
_backendThreadPool.Close();
}
Thread::UP
-ThreadPoolImpl::startThread(Runnable& runnable,
- vespalib::stringref id,
- uint64_t waitTimeMs,
- uint64_t maxProcessTime,
- int ticksBeforeWait)
+ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, uint64_t waitTimeMs,
+ uint64_t maxProcessTime, int ticksBeforeWait)
{
vespalib::LockGuard lock(_threadVectorLock);
if (_stopping) {
- throw vespalib::IllegalStateException("Threadpool is stopping", VESPA_STRLOC);
+ throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC);
}
ThreadImpl* ti;
- Thread::UP t(ti = new ThreadImpl(
- *this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait));
+ Thread::UP t(ti = new ThreadImpl(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait));
_threads.push_back(ti);
return t;
}
@@ -66,9 +61,8 @@ void
ThreadPoolImpl::visitThreads(ThreadVisitor& visitor) const
{
vespalib::LockGuard lock(_threadVectorLock);
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- visitor.visitThread(_threads[i]->getId(), _threads[i]->getProperties(),
- _threads[i]->getTickData());
+ for (const ThreadImpl * thread : _threads) {
+ visitor.visitThread(thread->getId(), thread->getProperties(), thread->getTickData());
}
}
@@ -78,14 +72,12 @@ ThreadPoolImpl::unregisterThread(ThreadImpl& t)
vespalib::LockGuard lock(_threadVectorLock);
std::vector<ThreadImpl*> threads;
threads.reserve(_threads.size());
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- if (_threads[i] != &t) {
- threads.push_back(_threads[i]);
+ for (ThreadImpl * thread : _threads) {
+ if (thread != &t) {
+ threads.push_back(thread);
}
}
_threads.swap(threads);
}
-} // defaultimplementation
-} // framework
-} // storage
+}
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
index 2b16836eba2..4e973c2ad20 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
@@ -12,11 +12,11 @@ class ThreadImpl;
struct ThreadPoolImpl : public ThreadPool
{
- FastOS_ThreadPool _backendThreadPool;
- std::vector<ThreadImpl*> _threads;
- vespalib::Lock _threadVectorLock;
- Clock& _clock;
- bool _stopping;
+ FastOS_ThreadPool _backendThreadPool;
+ std::vector<ThreadImpl*> _threads;
+ vespalib::Lock _threadVectorLock;
+ Clock & _clock;
+ bool _stopping;
public:
ThreadPoolImpl(Clock&);
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.cpp b/storageframework/src/vespa/storageframework/generic/component/component.cpp
index a35cad68b00..869df4296ef 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.cpp
+++ b/storageframework/src/vespa/storageframework/generic/component/component.cpp
@@ -33,9 +33,7 @@ Component::Component(ComponentRegister& cr, vespalib::stringref name)
cr.registerComponent(*this);
}
-Component::~Component()
-{
-}
+Component::~Component() = default;
void
Component::registerComponentStateListener(ComponentStateListener& l)
@@ -67,8 +65,7 @@ Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period)
assert(_metricUpdateHook.first == 0);
_metricUpdateHook = std::make_pair(&hook, period);
if (_metricReg != 0) {
- _metricReg->registerUpdateHook(
- _name, *_metricUpdateHook.first, _metricUpdateHook.second);
+ _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
}
@@ -86,8 +83,7 @@ void
Component::setMetricRegistrator(MetricRegistrator& mr) {
_metricReg = &mr;
if (_metricUpdateHook.first != 0) {
- _metricReg->registerUpdateHook(
- _name, *_metricUpdateHook.first, _metricUpdateHook.second);
+ _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
if (_metric != 0) {
_metricReg->registerMetric(*_metric);
@@ -117,16 +113,10 @@ Component::getClock() const
// Helper functions for components wanting to start a single thread.
Thread::UP
-Component::startThread(Runnable& runnable,
- MilliSecTime waitTime,
- MilliSecTime maxProcessTime,
- int ticksBeforeWait)
+Component::startThread(Runnable& runnable, MilliSecTime waitTime, MilliSecTime maxProcessTime, int ticksBeforeWait)
{
- return getThreadPool().startThread(runnable,
- getName(),
- waitTime.getTime(),
- maxProcessTime.getTime(),
- ticksBeforeWait);
+ return getThreadPool().startThread(runnable, getName(), waitTime.getTime(),
+ maxProcessTime.getTime(), ticksBeforeWait);
}
void
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.h b/storageframework/src/vespa/storageframework/generic/component/component.h
index 8a65d186557..b16e31290b8 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.h
+++ b/storageframework/src/vespa/storageframework/generic/component/component.h
@@ -79,7 +79,7 @@ namespace storage::framework {
class ComponentRegister;
struct ComponentStateListener {
- virtual ~ComponentStateListener() {}
+ virtual ~ComponentStateListener() = default;
virtual void onOpen() {}
virtual void onClose() {}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
index 20868e79fce..2ca29223254 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -17,7 +17,7 @@ void
ServiceLayerProcess::shutdown()
{
Process::shutdown();
- _node.reset(0);
+ _node.reset();
}
void
@@ -25,7 +25,7 @@ ServiceLayerProcess::createNode()
{
_externalVisitors["searchvisitor"].reset(new SearchVisitorFactory(_configUri));
setupProvider();
- _node.reset(new ServiceLayerNode(_configUri, _context, *this, getProvider(), _externalVisitors));
+ _node = std::make_unique<ServiceLayerNode>(_configUri, _context, *this, getProvider(), _externalVisitors);
_node->init();
}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
index 0a002db114b..af991c00f03 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
@@ -19,9 +19,8 @@
#include "process.h"
#include <vespa/storage/storageserver/servicelayernodecontext.h>
#include <vespa/storage/common/visitorfactory.h>
-#include <vespa/config/config.h>
-#include <vespa/config/helper/configfetcher.h>
-#include <vespa/config-persistence.h>
+
+namespace config { class ConfigUri; }
namespace storage {