diff options
author | Henning Baldersheim <balder@oath.com> | 2018-03-19 11:52:04 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-03-19 13:01:17 +0100 |
commit | b9c175537bc743a0bf457d579d03b59867d1a1f1 (patch) | |
tree | aed9fcb4c0ceeb66101898c2fc413821daa2c244 | |
parent | fcbaf85bc896b2664a64bda95f47c2e45f64a08b (diff) |
- Use num_threads directly as it has a sane default.
- Use std::make_ insteda of explicit new.
- std::shared_ptr can be assigned from std::unque_ptr. Avoid using release().
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp | 160 |
1 files changed, 55 insertions, 105 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 604acebcf38..b4735c2961a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -29,10 +29,8 @@ using document::BucketSpace; namespace storage { FileStorManager:: -FileStorManager(const config::ConfigUri & configUri, - const spi::PartitionStateList& partitions, - spi::PersistenceProvider& provider, - ServiceLayerComponentRegister& compReg) +FileStorManager(const config::ConfigUri & configUri, const spi::PartitionStateList& partitions, + spi::PersistenceProvider& provider, ServiceLayerComponentRegister& compReg) : StorageLinkQueued("File store manager", compReg), framework::HtmlStatusReporter("filestorman", "File store manager"), _compReg(compReg), @@ -83,8 +81,7 @@ FileStorManager::~FileStorManager() } } } - LOG(debug, "Closing all filestor queues, answering queued messages. " - "New messages will be refused."); + LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused."); _filestorHandler->close(); LOG(debug, "Deleting filestor threads. Waiting for their current operation " "to finish. Stop their threads and delete objects."); @@ -116,7 +113,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _disks.resize(_component.getDiskCount()); - size_t numThreads = (_config->numThreads) ? 6 : _config->numThreads; + size_t numThreads = _config->numThreads; _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads); _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg)); @@ -124,10 +121,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC if (_partitions[i].isUp()) { LOG(spam, "Setting up disk %u", i); for (uint32_t j = 0; j < numThreads; j++) { - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i))); - + _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler, + *_metrics->disks[i]->threads[j], i)); } } else { _filestorHandler->disable(i); @@ -137,36 +132,28 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC } void -FileStorManager::replyDroppedOperation(api::StorageMessage& msg, - const document::Bucket& bucket, - api::ReturnCode::Result returnCode, - vespalib::stringref reason) +FileStorManager::replyDroppedOperation(api::StorageMessage& msg, const document::Bucket& bucket, + api::ReturnCode::Result returnCode, vespalib::stringref reason) { std::ostringstream error; error << "Dropping " << msg.getType() << " to bucket " << bucket.toString() << ". Reason: " << reason; LOGBT(debug, bucket.toString(), "%s", error.str().c_str()); if (!msg.getType().isReply()) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(msg).makeReply(); reply->setResult(api::ReturnCode(returnCode, error.str())); sendUp(reply); } } void -FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, - const document::Bucket& bucket) +FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, const document::Bucket& bucket) { - replyDroppedOperation(msg, - bucket, - api::ReturnCode::BUCKET_NOT_FOUND, - "bucket does not exist"); + replyDroppedOperation(msg, bucket, api::ReturnCode::BUCKET_NOT_FOUND, "bucket does not exist"); } StorBucketDatabase::WrappedEntry -FileStorManager::mapOperationToDisk(api::StorageMessage& msg, - const document::Bucket& bucket) +FileStorManager::mapOperationToDisk(api::StorageMessage& msg, const document::Bucket& bucket) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::mapOperationToDisk")); @@ -177,8 +164,7 @@ FileStorManager::mapOperationToDisk(api::StorageMessage& msg, } StorBucketDatabase::WrappedEntry -FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, - const document::DocumentId* docId) +FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const document::DocumentId* docId) { StorBucketDatabase &database = _component.getBucketDatabase(cmd.getBucket().getBucketSpace()); StorBucketDatabase::WrappedEntry entry(database.get( @@ -188,17 +174,12 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, if (docId) { specific = _bucketIdFactory.getBucketId(*docId); } - typedef std::map<document::BucketId, - StorBucketDatabase::WrappedEntry> BucketMap; + typedef std::map<document::BucketId, StorBucketDatabase::WrappedEntry> BucketMap; std::shared_ptr<api::StorageReply> reply; { - BucketMap results( - database.getContained( - specific, "FileStorManager::mapOperationToBucketAndDisk-2")); + BucketMap results( database.getContained( specific, "FileStorManager::mapOperationToBucketAndDisk-2")); if (results.size() == 1) { - LOG(debug, - "Remapping %s operation to specific %s versus " - "non-existing %s to %s.", + LOG(debug, "Remapping %s operation to specific %s versus non-existing %s to %s.", cmd.toString().c_str(), specific.toString().c_str(), cmd.getBucketId().toString().c_str(), results.begin()->first.toString().c_str()); @@ -223,10 +204,8 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, } LOGBT(debug, cmd.getBucketId().toString(), "%s", error.str().c_str()); - reply.reset(static_cast<api::StorageCommand&>(cmd).makeReply().release()); - reply->setResult( - api::ReturnCode( - api::ReturnCode::BUCKET_NOT_FOUND, error.str())); + reply = static_cast<api::StorageCommand&>(cmd).makeReply(); + reply->setResult( api::ReturnCode( api::ReturnCode::BUCKET_NOT_FOUND, error.str())); } sendUp(reply); } @@ -234,18 +213,15 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, } bool -FileStorManager::handlePersistenceMessage( - const shared_ptr<api::StorageMessage>& msg, uint16_t disk) +FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk) { api::ReturnCode errorCode(api::ReturnCode::OK); do { - LOG(spam, "Received %s. Attempting to queue it to disk %u.", - msg->getType().getName().c_str(), disk); + LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk); LOG_BUCKET_OPERATION_NO_LOCK( getStorageMessageBucket(*msg).getBucketId(), - vespalib::make_string("Attempting to queue %s to disk %u", - msg->toString().c_str(), disk)); + vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk)); if (_filestorHandler->schedule(msg, disk)) { @@ -255,12 +231,10 @@ FileStorManager::handlePersistenceMessage( } switch (_filestorHandler->getDiskState(disk)) { case FileStorHandler::DISABLED: - errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, - "Disk disabled"); + errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"); break; case FileStorHandler::CLOSED: - errorCode = api::ReturnCode(api::ReturnCode::ABORTED, - "Shutting down storage node."); + errorCode = api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."); break; case FileStorHandler::AVAILABLE: assert(false); @@ -269,8 +243,7 @@ FileStorManager::handlePersistenceMessage( // If we get here, we failed to schedule message. errorCode says why // We need to reply to message (while not having bucket lock) if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(*msg).makeReply(); reply->setResult(errorCode); LOG(spam, "Received persistence message %s. Returning reply: %s", msg->getType().getName().c_str(), errorCode.toString().c_str()); @@ -283,7 +256,7 @@ bool FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Put command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -291,8 +264,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -303,7 +275,7 @@ bool FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Update command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -311,8 +283,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -322,8 +293,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) bool FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -334,7 +304,7 @@ bool FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Remove command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -342,8 +312,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -353,8 +322,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) bool FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, 0)); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0)); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -403,12 +371,10 @@ FileStorManager::onCreateBucket( bucket.getBucketId(), "FileStorManager::onCreateBucket", StorBucketDatabase::CREATE_IF_NONEXISTING)); if (entry.preExisted()) { - LOG(debug, - "Got create bucket request for %s which already exists: %s", + LOG(debug, "Got create bucket request for %s which already exists: %s", cmd->getBucketId().toString().c_str(), entry->getBucketInfo().toString().c_str()); - code = api::ReturnCode(api::ReturnCode::EXISTS, - "Bucket already exist"); + code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist"); } else { entry->disk = _component.getIdealPartition(cmd->getBucket()); if (_partitions[entry->disk].isUp()) { @@ -425,12 +391,9 @@ FileStorManager::onCreateBucket( return true; } else { entry.remove(); - code = api::ReturnCode( - api::ReturnCode::IO_FAILURE, - vespalib::make_string( - "Trying to create bucket %s on disabled disk %d", - cmd->getBucketId().toString().c_str(), - entry->disk)); + code = api::ReturnCode(api::ReturnCode::IO_FAILURE, + vespalib::make_string("Trying to create bucket %s on disabled disk %d", + cmd->getBucketId().toString().c_str(), entry->disk)); } } } @@ -451,7 +414,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) "FileStorManager::onDeleteBucket")); if (!entry.exist()) { LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str()); - std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply = cmd->makeReply(); sendUp(reply); return true; } @@ -473,7 +436,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) << entry->getBucketInfo().toString(); LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str()); - std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply = cmd->makeReply(); static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); entry.unlock(); @@ -523,8 +486,7 @@ FileStorManager::ensureConsistentBucket( bool FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, - "FileStorManager::onMergeBucket")); + StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onMergeBucket")); if (!entry.exist()) { return true; } @@ -547,7 +509,7 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(), _component.getStateUpdater().getClusterStateBundle()->getVersion())); LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str()); - api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd)); + auto reply = std::make_shared<api::MergeBucketReply>(*cmd); reply->setResult(code); sendUp(reply); return true; @@ -562,39 +524,30 @@ bool FileStorManager::onGetBucketDiff( const shared_ptr<api::GetBucketDiffCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry( - ensureConsistentBucket(cmd->getBucket(), - *cmd, - "FileStorManager::onGetBucketDiff")); + StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff")); if (!entry.exist()) { return true; } if (!entry.preExisted()) { entry->disk = _component.getIdealPartition(cmd->getBucket()); if (_partitions[entry->disk].isUp()) { - LOG(debug, "Created bucket %s on disk %d (node index is %d) due " - "to get bucket diff being received.", - cmd->getBucketId().toString().c_str(), - entry->disk, _component.getIndex()); + LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.", + cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex()); entry->info.setTotalDocumentSize(0); entry->info.setUsedFileSize(0); entry->info.setReady(true); - // Call before writing bucket entry as we need to have bucket - // lock while calling + // Call before writing bucket entry as we need to have bucket + // lock while calling handlePersistenceMessage(cmd, entry->disk); entry.write(); } else { entry.remove(); api::ReturnCode code(api::ReturnCode::IO_FAILURE, vespalib::make_string( - "Trying to merge non-existing bucket %s, which " - "can't be created because target disk %d is down", - cmd->getBucketId().toString().c_str(), - entry->disk)); - LOGBT(warning, cmd->getBucketId().toString(), - "%s", code.getMessage().c_str()); - api::GetBucketDiffReply::SP reply( - new api::GetBucketDiffReply(*cmd)); + "Trying to merge non-existing bucket %s, which can't be created because target disk %d is down", + cmd->getBucketId().toString().c_str(), entry->disk)); + LOGBT(warning, cmd->getBucketId().toString(), "%s", code.getMessage().c_str()); + auto reply = std::make_shared<api::GetBucketDiffReply>(*cmd); reply->setResult(code); sendUp(reply); return true; @@ -614,8 +567,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const BucketSpace bucketSpace(msg.getBucket().getBucketSpace()); if (!_component.getBucketDatabase(bucketSpace).isConsistent(entry)) { document::Bucket bucket(bucketSpace, entry.getBucketId()); - replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, - "bucket became inconsistent during merging"); + replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket became inconsistent during merging"); return false; } return true; @@ -695,8 +647,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c } bool -FileStorManager::onSetBucketState( - const std::shared_ptr<api::SetBucketStateCommand>& cmd) +FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateCommand>& cmd) { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { @@ -801,7 +752,7 @@ void FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd) { _filestorHandler->abortQueuedOperations(*cmd); - sendReply(api::StorageReply::SP(cmd->makeReply().release())); + sendReply(api::StorageReply::SP(cmd->makeReply())); } bool @@ -893,8 +844,7 @@ void FileStorManager::onFlush(bool downwards) } void -FileStorManager::reportHtmlStatus(std::ostream& out, - const framework::HttpUrlPath& path) const +FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath& path) const { bool showStatus = !path.hasAttribute("thread"); bool verbose = path.hasAttribute("verbose"); @@ -942,7 +892,7 @@ FileStorManager::updateState() bool nodeUp = state->getNodeState(node).getState().oneOf("uir"); LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str()); - // If edge where we go down + // If edge where we go down if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) { LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database"); Deactivator deactivator; |