diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-13 11:32:52 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-13 11:51:00 +0200 |
commit | ac1d8b274911cf1f71e751646ca6aed260213934 (patch) | |
tree | 6066792bd6244e8393a4fbc700d9c4f99596372b /storage | |
parent | 025b759b5cf0146cd0cf3f44f686eeace1b28d09 (diff) |
Cleanup while looking for missing join.
Diffstat (limited to 'storage')
6 files changed, 96 insertions, 200 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index b46217f6443..ac3d901fd65 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -89,8 +89,7 @@ FileStorManager::~FileStorManager() } void -FileStorManager::print(std::ostream& out, bool verbose, - const std::string& indent) const +FileStorManager::print(std::ostream& out, bool verbose, const std::string& indent) const { (void) verbose; (void) indent; out << "FileStorManager"; @@ -128,20 +127,14 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC LOG(spam, "Setting up disk %u", i); for (uint32_t j = 0; j < 4; j++) { _disks[i].push_back(DiskThread::SP( - new PersistenceThread( - _compReg, _configUri, *_provider, - *_filestorHandler, - *_metrics->disks[i]->threads[j], - i, 255, false))); + new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, + *_metrics->disks[i]->threads[j], i, 255))); } for (uint32_t j = 4; j < 6; j++) { _disks[i].push_back(DiskThread::SP( - new PersistenceThread( - _compReg, _configUri, *_provider, - *_filestorHandler, - *_metrics->disks[i]->threads[j], - i, 100))); + new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, + *_metrics->disks[i]->threads[j], i, 100))); } } @@ -149,12 +142,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC LOG(spam, "Setting up disk %u, thread %u with priority %d", i, j, _config->threads[j].lowestpri); _disks[i].push_back(DiskThread::SP( - new PersistenceThread( - _compReg, _configUri, *_provider, - *_filestorHandler, - *_metrics->disks[i]->threads[j], - i, _config->threads[j].lowestpri, - false))); + new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, + *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri))); } } else { @@ -389,11 +378,9 @@ FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd) } bool -FileStorManager::onMultiOperation( - const std::shared_ptr<api::MultiOperationCommand>& cmd) +FileStorManager::onMultiOperation(const std::shared_ptr<api::MultiOperationCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, 0)); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0)); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -401,11 +388,9 @@ FileStorManager::onMultiOperation( } bool -FileStorManager::onBatchPutRemove( - const std::shared_ptr<api::BatchPutRemoveCommand>& cmd) +FileStorManager::onBatchPutRemove(const std::shared_ptr<api::BatchPutRemoveCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, 0)); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0)); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -413,11 +398,9 @@ FileStorManager::onBatchPutRemove( } bool -FileStorManager::onRemoveLocation( - const std::shared_ptr<api::RemoveLocationCommand>& cmd) +FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -425,11 +408,9 @@ FileStorManager::onRemoveLocation( } bool -FileStorManager::onStatBucket( - const std::shared_ptr<api::StatBucketCommand>& cmd) +FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -478,8 +459,7 @@ FileStorManager::onCreateBucket( } } } - std::shared_ptr<api::CreateBucketReply> reply( - (api::CreateBucketReply*)cmd->makeReply().release()); + std::shared_ptr<api::CreateBucketReply> reply((api::CreateBucketReply*)cmd->makeReply().release()); reply->setBucketInfo(api::BucketInfo(0, 0, 0, 0, 0, true, cmd->getActive())); reply->setResult(code); sendUp(reply); @@ -491,12 +471,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) { uint16_t disk; { - StorBucketDatabase::WrappedEntry entry( - _component.getBucketDatabase().get( - cmd->getBucketId(), "FileStorManager::onDeleteBucket")); + StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(cmd->getBucketId(), + "FileStorManager::onDeleteBucket")); if (!entry.exist()) { - LOG(debug, "%s was already deleted", - cmd->getBucketId().toString().c_str()); + LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str()); std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); sendUp(reply); return true; @@ -520,10 +498,8 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str()); std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); - static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo( - entry->getBucketInfo()); - reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, - ost.str())); + static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo()); + reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); entry.unlock(); sendUp(reply); return true; @@ -538,13 +514,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) disk = entry->disk; entry.remove(); } - _filestorHandler->failOperations( - cmd->getBucketId(), - disk, - api::ReturnCode(api::ReturnCode::BUCKET_DELETED, - vespalib::make_string( - "Bucket %s about to be deleted anyway", - cmd->getBucketId().toString().c_str()))); + _filestorHandler->failOperations(cmd->getBucketId(), disk, + api::ReturnCode(api::ReturnCode::BUCKET_DELETED, + vespalib::make_string("Bucket %s about to be deleted anyway", + cmd->getBucketId().toString().c_str()))); return true; } @@ -564,10 +537,7 @@ FileStorManager::ensureConsistentBucket( // Don't create empty bucket if merge isn't allowed to continue. entry.remove(); } - replyDroppedOperation(msg, - bucket, - api::ReturnCode::ABORTED, - "bucket is inconsistently split"); + replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split"); return StorBucketDatabase::WrappedEntry(); } @@ -577,10 +547,8 @@ FileStorManager::ensureConsistentBucket( bool FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry( - ensureConsistentBucket(cmd->getBucketId(), - *cmd, - "FileStorManager::onMergeBucket")); + StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucketId(), *cmd, + "FileStorManager::onMergeBucket")); if (!entry.exist()) { return true; } @@ -589,26 +557,20 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) entry->disk = _component.getIdealPartition(cmd->getBucketId()); if (_partitions[entry->disk].isUp()) { entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); - LOG(debug, "Created bucket %s on disk %d (node index is %d) due " - "to merge being received.", - cmd->getBucketId().toString().c_str(), - entry->disk, _component.getIndex()); + LOG(debug, "Created bucket %s on disk %d (node index is %d) due to merge being received.", + cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex()); // Call before writing bucket entry as we need to have bucket // lock while calling handlePersistenceMessage(cmd, entry->disk); entry.write(); } else { entry.remove(); - api::ReturnCode code( - api::ReturnCode::IO_FAILURE, + api::ReturnCode code(api::ReturnCode::IO_FAILURE, vespalib::make_string( "Trying to perform merge %s whose bucket belongs on target disk %d, which is down. Cluster state version of command is %d, our system state version is %d", - cmd->toString().c_str(), - entry->disk, - cmd->getClusterStateVersion(), + cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(), _component.getStateUpdater().getSystemState()->getVersion())); - LOGBT(debug, cmd->getBucketId().toString(), - "%s", code.getMessage().c_str()); + LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str()); api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd)); reply->setResult(code); sendUp(reply); @@ -668,17 +630,13 @@ FileStorManager::onGetBucketDiff( } bool -FileStorManager::validateApplyDiffCommandBucket( - api::StorageMessage& msg, - const StorBucketDatabase::WrappedEntry& entry) +FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry& entry) { if (!entry.exist()) { return false; } if (!_component.getBucketDatabase().isConsistent(entry)) { - replyDroppedOperation(msg, - entry.getBucketId(), - api::ReturnCode::ABORTED, + replyDroppedOperation(msg, entry.getBucketId(), api::ReturnCode::ABORTED, "bucket became inconsistent during merging"); return false; } @@ -686,31 +644,26 @@ FileStorManager::validateApplyDiffCommandBucket( } bool -FileStorManager::validateDiffReplyBucket( - const StorBucketDatabase::WrappedEntry& entry, - const document::BucketId& bucket) +FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry, + const document::BucketId& bucket) { if (!entry.exist()) { _filestorHandler->clearMergeStatus(bucket, - api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, - "Bucket removed during merge")); + api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, "Bucket removed during merge")); return false; } if (!_component.getBucketDatabase().isConsistent(entry)) { _filestorHandler->clearMergeStatus(bucket, - api::ReturnCode(api::ReturnCode::ABORTED, - "Bucket became inconsistent during merging")); + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket became inconsistent during merging")); return false; } return true; } bool -FileStorManager::onGetBucketDiffReply( - const shared_ptr<api::GetBucketDiffReply>& reply) +FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *reply, reply->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId())); if (validateDiffReplyBucket(entry, reply->getBucketId())) { handlePersistenceMessage(reply, entry->disk); } @@ -718,11 +671,9 @@ FileStorManager::onGetBucketDiffReply( } bool -FileStorManager::onApplyBucketDiff( - const shared_ptr<api::ApplyBucketDiffCommand>& cmd) +FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (validateApplyDiffCommandBucket(*cmd, entry)) { handlePersistenceMessage(cmd, entry->disk); } @@ -730,8 +681,7 @@ FileStorManager::onApplyBucketDiff( } bool -FileStorManager::onApplyBucketDiffReply( - const shared_ptr<api::ApplyBucketDiffReply>& reply) +FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply) { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( *reply, reply->getBucketId())); @@ -742,8 +692,7 @@ FileStorManager::onApplyBucketDiffReply( } bool -FileStorManager::onJoinBuckets( - const std::shared_ptr<api::JoinBucketsCommand>& cmd) +FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& cmd) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get( cmd->getBucketId(), "FileStorManager::onJoinBuckets")); @@ -757,11 +706,9 @@ FileStorManager::onJoinBuckets( } bool -FileStorManager::onSplitBucket( - const std::shared_ptr<api::SplitBucketCommand>& cmd) +FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -772,8 +719,7 @@ bool FileStorManager::onSetBucketState( const std::shared_ptr<api::SetBucketStateCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -786,10 +732,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) switch (msg->getType()) { case GetIterCommand::ID: { - shared_ptr<GetIterCommand> cmd( - std::static_pointer_cast<GetIterCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg)); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -797,10 +741,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } case CreateIteratorCommand::ID: { - shared_ptr<CreateIteratorCommand> cmd( - std::static_pointer_cast<CreateIteratorCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg)); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -808,28 +750,22 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } case DestroyIteratorCommand::ID: { - spi::Context context(msg->getLoadType(), msg->getPriority(), - msg->getTrace().getLevel()); - shared_ptr<DestroyIteratorCommand> cmd( - std::static_pointer_cast<DestroyIteratorCommand>(msg)); + spi::Context context(msg->getLoadType(), msg->getPriority(), msg->getTrace().getLevel()); + shared_ptr<DestroyIteratorCommand> cmd(std::static_pointer_cast<DestroyIteratorCommand>(msg)); _provider->destroyIterator(cmd->getIteratorId(), context); msg->getTrace().getRoot().addChild(context.getTrace().getRoot()); return true; } case ReadBucketList::ID: { - shared_ptr<ReadBucketList> cmd( - std::static_pointer_cast<ReadBucketList>(msg)); - + shared_ptr<ReadBucketList> cmd(std::static_pointer_cast<ReadBucketList>(msg)); handlePersistenceMessage(cmd, cmd->getPartition()); return true; } case ReadBucketInfo::ID: { - shared_ptr<ReadBucketInfo> cmd( - std::static_pointer_cast<ReadBucketInfo>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg)); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -837,10 +773,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } case InternalBucketJoinCommand::ID: { - shared_ptr<InternalBucketJoinCommand> cmd( - std::static_pointer_cast<InternalBucketJoinCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg)); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -848,10 +782,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } case RepairBucketCommand::ID: { - shared_ptr<RepairBucketCommand> cmd( - std::static_pointer_cast<RepairBucketCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + shared_ptr<RepairBucketCommand> cmd(std::static_pointer_cast<RepairBucketCommand>(msg)); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -859,10 +791,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } case BucketDiskMoveCommand::ID: { - shared_ptr<BucketDiskMoveCommand> cmd( - std::static_pointer_cast<BucketDiskMoveCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + shared_ptr<BucketDiskMoveCommand> cmd(std::static_pointer_cast<BucketDiskMoveCommand>(msg)); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -870,10 +800,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } case RecheckBucketInfoCommand::ID: { - shared_ptr<RecheckBucketInfoCommand> cmd( - std::static_pointer_cast<RecheckBucketInfoCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *cmd, cmd->getBucketId())); + shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg)); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -881,8 +809,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } case AbortBucketOperationsCommand::ID: { - shared_ptr<AbortBucketOperationsCommand> cmd( - std::static_pointer_cast<AbortBucketOperationsCommand>(msg)); + shared_ptr<AbortBucketOperationsCommand> cmd(std::static_pointer_cast<AbortBucketOperationsCommand>(msg)); handleAbortBucketOperations(cmd); return true; } @@ -892,8 +819,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) } void -FileStorManager::handleAbortBucketOperations( - const shared_ptr<AbortBucketOperationsCommand>& cmd) +FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd) { _filestorHandler->abortQueuedOperations(*cmd); sendReply(api::StorageReply::SP(cmd->makeReply().release())); @@ -925,8 +851,7 @@ FileStorManager::sendReply(const std::shared_ptr<api::StorageReply>& reply) LOG(spam, "Sending reply %s", reply->toString().c_str()); if (reply->getType() == api::MessageType::INTERNAL_REPLY) { - std::shared_ptr<api::InternalReply> rep( - std::dynamic_pointer_cast<api::InternalReply>(reply)); + std::shared_ptr<api::InternalReply> rep(std::dynamic_pointer_cast<api::InternalReply>(reply)); assert(rep.get()); if (onInternalReply(rep)) return; } @@ -1021,8 +946,7 @@ FileStorManager::isMerging(const document::BucketId& bucket) const namespace { struct Deactivator { - StorBucketDatabase::Decision operator()( - document::BucketId::Type, StorBucketDatabase::Entry& data) + StorBucketDatabase::Decision operator()(document::BucketId::Type, StorBucketDatabase::Entry& data) { data.info.setActive(false); return StorBucketDatabase::UPDATE; @@ -1034,21 +958,16 @@ void FileStorManager::updateState() { lib::ClusterState::CSP state(_component.getStateUpdater().getSystemState()); - spi::ClusterState spiState( - *state, _component.getIndex(), *_component.getDistribution()); + spi::ClusterState spiState(*state, _component.getIndex(), *_component.getDistribution()); lib::Node node(_component.getNodeType(), _component.getIndex()); bool nodeUp = state->getNodeState(node).getState().oneOf("uir"); - LOG(debug, "FileStorManager received cluster state '%s'", - state->toString().c_str()); + LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str()); // If edge where we go down if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) { - LOG(debug, - "Received cluster state where this node is down; " - "de-activating all buckets in database"); + LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database"); Deactivator deactivator; - _component.getBucketDatabase().all( - deactivator, "FileStorManager::updateState"); + _component.getBucketDatabase().all(deactivator, "FileStorManager::updateState"); } _provider->setClusterState(spiState); _nodeUpInLastNodeStateSeenByProvider = nodeUp; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 0ab512cd63f..11f8b5d1cf4 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -5,7 +5,6 @@ #include "bucketownershipnotifier.h" #include "testandsethelper.h" #include <vespa/storageapi/message/multioperation.h> -#include <vespa/storage/bucketdb/storbucketdb.h> #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/document/fieldset/fieldsetrepo.h> @@ -24,15 +23,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex, - uint8_t lowestPriority, - bool startThread) - : _env(configUri, - compReg, - filestorHandler, - metrics, - deviceIndex, - lowestPriority, - provider), + uint8_t lowestPriority) + : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider), _warnOnSlowOperations(5000), _spi(provider), _processAllHandler(_env, provider), @@ -43,13 +35,10 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, _flushMonitor(), _closed(false) { - (void) startThread; std::ostringstream threadName; - threadName << "Disk " << _env._partition << " thread " - << (void*) this; + threadName << "Disk " << _env._partition << " thread " << (void*) this; _component.reset(new ServiceLayerComponent(compReg, threadName.str())); - _bucketOwnershipNotifier.reset( - new BucketOwnershipNotifier(*_component, filestorHandler)); + _bucketOwnershipNotifier.reset(new BucketOwnershipNotifier(*_component, filestorHandler)); framework::MilliSecTime maxProcessingTime(60 * 1000); framework::MilliSecTime waitTime(1000); _thread = _component->startThread(*this, maxProcessingTime, waitTime); @@ -57,8 +46,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, PersistenceThread::~PersistenceThread() { - LOG(debug, "Shutting down persistence thread. Waiting for current " - "operation to finish."); + LOG(debug, "Shutting down persistence thread. Waiting for current operation to finish."); _thread->interrupt(); LOG(debug, "Waiting for thread to terminate."); _thread->join(); @@ -66,8 +54,7 @@ PersistenceThread::~PersistenceThread() } spi::Bucket -PersistenceThread::getBucket(const DocumentId& id, - const BucketId& bucket) const +PersistenceThread::getBucket(const DocumentId& id, const BucketId& bucket) const { BucketId docBucket(_env._bucketFactory.getBucketId(id)); docBucket.setUsedBits(bucket.getUsedBits()); @@ -82,8 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id, } bool -PersistenceThread::checkForError(const spi::Result& response, - MessageTracker& tracker) +PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker) { uint32_t code = _env.convertErrorCode(response); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 218d2f7dd23..7719f796e3d 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -25,8 +25,7 @@ public: FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex, - uint8_t lowestPriority, - bool startThread = false); + uint8_t lowestPriority); ~PersistenceThread(); /** Waits for current operation to be finished. */ @@ -72,22 +71,18 @@ private: vespalib::Monitor _flushMonitor; bool _closed; - void setBucketInfo(MessageTracker& tracker, - const document::BucketId& bucketId); + void setBucketInfo(MessageTracker& tracker, const document::BucketId& bucketId); - bool checkProviderBucketInfoMatches(const spi::Bucket&, - const api::BucketInfo&) const; + bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; - void updateBucketDatabase(const document::BucketId& id, - const api::BucketInfo& info); + void updateBucketDatabase(const document::BucketId& id, const api::BucketInfo& info); /** * Sanity-checking of join command parameters. Invokes tracker.fail() with * an appropriate error and returns false iff the command does not validate * OK. Returns true and does not touch the tracker otherwise. */ - bool validateJoinCommand(const api::JoinBucketsCommand& cmd, - MessageTracker& tracker) const; + bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const; // Message handling functions MessageTracker::UP handleCommand(api::StorageCommand&); @@ -102,8 +97,7 @@ private: bool checkForError(const spi::Result& response, MessageTracker& tracker); spi::Bucket getBucket(const DocumentId& id, const BucketId& bucket) const; - void flushAllReplies(const document::BucketId& bucketId, - std::vector<MessageTracker::UP>& trackers); + void flushAllReplies(const document::BucketId& bucketId, std::vector<MessageTracker::UP>& trackers); friend class TestAndSetHelper; bool tasConditionExists(const api::TestAndSetCommand & cmd); diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index e2861ef42cd..9faf380d2ba 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -28,13 +28,11 @@ using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBu namespace storage { -ServiceLayerNode::ServiceLayerNode( - const config::ConfigUri & configUri, - ServiceLayerNodeContext& context, - ApplicationGenerationFetcher& generationFetcher, - spi::PersistenceProvider& persistenceProvider, - const VisitorFactory::Map& externalVisitors) - : StorageNode(configUri, context, generationFetcher, std::unique_ptr<HostInfo>(new HostInfo)), +ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context, + ApplicationGenerationFetcher& generationFetcher, + spi::PersistenceProvider& persistenceProvider, + const VisitorFactory::Map& externalVisitors) + : StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>()), _context(context), _persistenceProvider(persistenceProvider), _partitions(0), diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp index 5f4ceff2a6b..2e1aa52e68d 100644 --- a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp +++ b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp @@ -4,16 +4,16 @@ #include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h> -using storage::framework::defaultimplementation::AllocationLogic; - namespace storage { +using framework::defaultimplementation::AllocationLogic; +using framework::defaultimplementation::PriorityMemoryLogic; + StorageNodeContext::StorageNodeContext(ComponentRegister::UP compReg, framework::Clock::UP clock) : _componentRegister(std::move(compReg)), _clock(std::move(clock)), _threadPool(*_clock), - _memoryLogic(new framework::defaultimplementation::PriorityMemoryLogic( - *_clock, 1024 * 1024 * 1024)), + _memoryLogic(new PriorityMemoryLogic(*_clock, 1024 * 1024 * 1024)), _memoryManager(AllocationLogic::UP(_memoryLogic)) { _componentRegister->setClock(*_clock); diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h index 7d6f3b0aef5..0149f975f63 100644 --- a/storage/src/vespa/storage/storageserver/storagenodecontext.h +++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h @@ -53,8 +53,7 @@ struct StorageNodeContext { protected: // Initialization has been split in two as subclass needs to initialize // component register before sending it on. - StorageNodeContext(ComponentRegister::UP, - framework::Clock::UP); + StorageNodeContext(ComponentRegister::UP, framework::Clock::UP); private: ComponentRegister::UP _componentRegister; |