diff options
author | Henning Baldersheim <balder@oath.com> | 2018-06-07 16:47:01 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-06-12 09:56:59 +0200 |
commit | 498bd3df67182d16e021fccd5514c578838ba2bf (patch) | |
tree | b43692b3621746ff2617ff6ba871d8d459ef57e7 /storage | |
parent | 89521c8531b0eeb03bd28f899ecd59a9f8b334a6 (diff) |
Reduce code visibility
Diffstat (limited to 'storage')
8 files changed, 90 insertions, 179 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index d42330086f0..d12e6b90b2a 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -5,6 +5,7 @@ #include <vespa/document/config/config-documenttypes.h> #include <vespa/document/datatype/documenttype.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/storage/bucketdb/bucketmanager.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index fea4ae15efc..81b0293b0c0 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/document/test/make_document_bucket.h> using document::test::makeDocumentBucket; diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 9de304db497..fd880effda9 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -8,6 +8,7 @@ #include <vespa/storageapi/message/bucket.h> #include <tests/distributor/distributortestutil.h> #include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/operations/external/updateoperation.h> #include <vespa/vespalib/testkit/test_kit.h> diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 396cc0d70de..f15921e447d 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -1,13 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "persistencetestutils.h" #include <vespa/document/datatype/documenttype.h> #include <vespa/storageapi/message/persistence.h> -#include <tests/persistence/persistencetestutils.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/persistence/spi/test.h> #include <vespa/document/update/assignvalueupdate.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 22ab954cc6a..34d4c397f09 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -145,11 +145,11 @@ public: bool headerOnly) { return doGetOnDisk(0, bucketId, docId, headerOnly); } - document::DocumentUpdate::SP createBodyUpdate( + std::shared_ptr<document::DocumentUpdate> createBodyUpdate( const document::DocumentId& id, const document::FieldValue& updateValue); - document::DocumentUpdate::SP createHeaderUpdate( + std::shared_ptr<document::DocumentUpdate> createHeaderUpdate( const document::DocumentId& id, const document::FieldValue& updateValue); @@ -172,7 +172,7 @@ public: uint16_t disk = 0); spi::UpdateResult doUpdate(document::BucketId bid, - const document::DocumentUpdate::SP& update, + const std::shared_ptr<document::DocumentUpdate>& update, spi::Timestamp time, uint16_t disk = 0); diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index caf24c4a8fe..c729df1e7eb 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -6,6 +6,7 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/documentapi/messagebus/messages/testandsetcondition.h> #include <vespa/document/fieldvalue/fieldvalues.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/persistence/spi/test.h> #include <functional> diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 170b5385f72..7df598bed97 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -13,6 +13,7 @@ #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> #include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/storageserver/changedbucketownershiphandler.h> diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 768e104e02b..0eeb4eeb84d 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -7,6 +7,7 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/document/fieldset/fieldsetrepo.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> @@ -192,9 +193,8 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(), (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); api::BucketInfo before = _env.getBucketInfo(cmd.getBucket()); - spi::Result result = - _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - cmd.verifyBody() ? spi::HIGH : spi::LOW); + spi::Result result = _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), + cmd.verifyBody() ? spi::HIGH : spi::LOW); if (checkForError(result, *tracker)) { api::BucketInfo after = _env.getBucketInfo(cmd.getBucket()); @@ -229,8 +229,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock()); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - LOG(warning, "Bucket %s was merging at create time. Unexpected.", - cmd.getBucketId().toString().c_str()); + LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); } spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition)); @@ -250,22 +249,18 @@ bool bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::Bucke // where Proton starts reporting actual document sizes, and will eventually // converge to a stable value. But for now, ignore it to prevent false positive // error logs and non-deleted buckets. - return ((a.getChecksum() == b.getChecksum()) - && (a.getDocumentCount() == b.getDocumentCount())); + return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); } } bool -PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, - const api::BucketInfo& info) const +PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const { spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); if (result.hasError()) { - LOG(error, - "getBucketInfo(%s) failed before deleting bucket; got error '%s'", - bucket.toString().c_str(), - result.getErrorMessage().c_str()); + LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", + bucket.toString().c_str(), result.getErrorMessage().c_str()); return false; } api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo())); @@ -280,9 +275,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, "bucket has %s. Deletion has been rejected to ensure data is not " "lost, but bucket may remain out of sync until service has been " "restarted.", - bucket.toString().c_str(), - info.toString().c_str(), - providerInfo.toString().c_str()); + bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); return false; } return true; @@ -387,9 +380,7 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) MessageTracker::UP PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.splitBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock()); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); // Calculate the various bucket ids involved. @@ -400,17 +391,15 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) } if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) { tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, - "Max lit bits must be set higher " - "than the number of bits used in the bucket to split"); + "Max lit bits must be set higher than the number of bits used in the bucket to split"); return tracker; } spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition)); SplitBitDetector::Result targetInfo; if (_env._config.enableMultibitSplitOptimalization) { - targetInfo = SplitBitDetector::detectSplit( - _spi, spiBucket, cmd.getMaxSplitBits(), - _context, cmd.getMinDocCount(), cmd.getMinByteSize()); + targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(), + _context, cmd.getMinDocCount(), cmd.getMinByteSize()); } if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { document::BucketId src(cmd.getBucketId()); @@ -449,13 +438,10 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) } } #endif - spi::Result result = _spi.split( - spiBucket, - spi::Bucket(target1, spi::PartitionId(lock1.disk)), - spi::Bucket(target2, spi::PartitionId(lock2.disk)), _context); + spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)), + spi::Bucket(target2, spi::PartitionId(lock2.disk)), _context); if (result.hasError()) { - tracker->fail(_env.convertErrorCode(result), - result.getErrorMessage()); + tracker->fail(_env.convertErrorCode(result), result.getErrorMessage()); return tracker; } // After split we need to take all bucket db locks to update them. @@ -466,8 +452,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) api::SplitBucketReply & splitReply = *reply; tracker->setReply(std::move(reply)); - typedef std::pair<StorBucketDatabase::WrappedEntry, - FileStorHandler::RemapInfo> TargetInfo; + typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo; std::vector<TargetInfo> targets; for (uint32_t i = 0; i < 2; i++) { const document::Bucket &target(i == 0 ? target1 : target2); @@ -478,8 +463,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) target.getBucketId(), "PersistenceThread::handleSplitBucket - Target", StorBucketDatabase::CREATE_IF_NONEXISTING), FileStorHandler::RemapInfo(target, disk))); - targets.back().first->setBucketInfo( - _env.getBucketInfo(target, disk)); + targets.back().first->setBucketInfo(_env.getBucketInfo(target, disk)); targets.back().first->disk = disk; } if (LOG_WOULD_LOG(spam)) { @@ -494,41 +478,30 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) targ2.getMetaCount()); } FileStorHandler::RemapInfo source(cmd.getBucket(), _env._partition); - _env._fileStorHandler.remapQueueAfterSplit( - source, targets[0].second, targets[1].second); - bool ownershipChanged( - !_bucketOwnershipNotifier->distributorOwns( - cmd.getSourceIndex(), cmd.getBucket())); + _env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second); + bool ownershipChanged(!_bucketOwnershipNotifier->distributorOwns(cmd.getSourceIndex(), cmd.getBucket())); // Now release all the bucketdb locks. for (uint32_t i = 0; i < targets.size(); i++) { if (ownershipChanged) { - notifyGuard.notifyAlways(targets[i].second.bucket, - targets[i].first->getBucketInfo()); + notifyGuard.notifyAlways(targets[i].second.bucket, targets[i].first->getBucketInfo()); } // The entries vector has the source bucket in element zero, so indexing // that with i+1 - if (targets[i].second.foundInQueue - || targets[i].first->getMetaCount() > 0) - { + if (targets[i].second.foundInQueue || targets[i].first->getMetaCount() > 0) { if (targets[i].first->getMetaCount() == 0) { // Fake that the bucket has content so it is not deleted. targets[i].first->info.setMetaCount(1); // Must make sure target bucket exists when we have pending ops // to an empty target bucket, since the provider will have // implicitly erased it by this point. - spi::Bucket createTarget( - spi::Bucket(targets[i].second.bucket, - spi::PartitionId(targets[i].second.diskIndex))); - LOG(debug, - "Split target %s was empty, but re-creating it since " - "there are remapped operations queued to it", + spi::Bucket createTarget(spi::Bucket(targets[i].second.bucket, + spi::PartitionId(targets[i].second.diskIndex))); + LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", createTarget.toString().c_str()); _spi.createBucket(createTarget, _context); } - splitReply.getSplitInfo().push_back( - api::SplitBucketReply::Entry( - targets[i].second.bucket.getBucketId(), - targets[i].first->getBucketInfo())); + splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(), + targets[i].first->getBucketInfo()); targets[i].first.write(); } else { targets[i].first.remove(); @@ -536,8 +509,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) } if (sourceEntry.exist()) { if (ownershipChanged) { - notifyGuard.notifyAlways(cmd.getBucket(), - sourceEntry->getBucketInfo()); + notifyGuard.notifyAlways(cmd.getBucket(), sourceEntry->getBucketInfo()); } // Delete the old entry. sourceEntry.remove(); @@ -546,30 +518,24 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) } bool -PersistenceThread::validateJoinCommand( - const api::JoinBucketsCommand& cmd, - MessageTracker& tracker) const +PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const { if (cmd.getSourceBuckets().size() != 2) { tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, - "Join needs exactly two buckets to be joined together" - + cmd.getBucketId().toString()); + "Join needs exactly two buckets to be joined together" + cmd.getBucketId().toString()); return false; } // Verify that source and target buckets look sane. for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { if (cmd.getSourceBuckets()[i] == cmd.getBucketId()) { tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, - "Join had both source and target bucket " - + cmd.getBucketId().toString()); + "Join had both source and target bucket " + cmd.getBucketId().toString()); return false; } if (!cmd.getBucketId().contains(cmd.getSourceBuckets()[i])) { tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, - "Source bucket " + - cmd.getSourceBuckets()[i].toString() - + " is not contained in target " - + cmd.getBucketId().toString()); + "Source bucket " + cmd.getSourceBuckets()[i].toString() + + " is not contained in target " + cmd.getBucketId().toString()); return false; } } @@ -579,9 +545,7 @@ PersistenceThread::validateJoinCommand( MessageTracker::UP PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.joinBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock()); if (!validateJoinCommand(cmd, *tracker)) { return tracker; } @@ -594,11 +558,8 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) { // Create empty bucket for target. StorBucketDatabase::WrappedEntry entry = - _env.getBucketDatabase(destBucket.getBucketSpace()).get( - destBucket.getBucketId(), - "join", - StorBucketDatabase::CREATE_IF_NONEXISTING); - + _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join", + StorBucketDatabase::CREATE_IF_NONEXISTING); entry->disk = _env._partition; entry.write(); } @@ -644,13 +605,10 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]); uint16_t disk = (i == 0) ? lock1.disk : lock2.disk; FileStorHandler::RemapInfo target(cmd.getBucket(), _env._partition); - _env._fileStorHandler.remapQueueAfterJoin( - FileStorHandler::RemapInfo(srcBucket, disk), - target); + _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket, disk), target); // Remove source from bucket db. - StorBucketDatabase::WrappedEntry entry( - _env.getBucketDatabase(srcBucket.getBucketSpace()).get( - srcBucket.getBucketId(), "join-remove-source")); + StorBucketDatabase::WrappedEntry entry = + _env.getBucketDatabase(srcBucket.getBucketSpace()).get(srcBucket.getBucketId(), "join-remove-source"); if (entry.exist()) { lastModified = std::max(lastModified, entry->info.getLastModified()); entry.remove(); @@ -658,13 +616,10 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) } { StorBucketDatabase::WrappedEntry entry = - _env.getBucketDatabase(destBucket.getBucketSpace()).get( - destBucket.getBucketId(), - "join", - StorBucketDatabase::CREATE_IF_NONEXISTING); + _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join", + StorBucketDatabase::CREATE_IF_NONEXISTING); if (entry->info.getLastModified() == 0) { - entry->info.setLastModified( - std::max(lastModified, entry->info.getLastModified())); + entry->info.setLastModified(std::max(lastModified, entry->info.getLastModified())); } entry.write(); } @@ -674,28 +629,21 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) MessageTracker::UP PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.setBucketStates, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.setBucketStates,_env._component.getClock()); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); - spi::BucketInfo::ActiveState newState( - shouldBeActive - ? spi::BucketInfo::ACTIVE - : spi::BucketInfo::NOT_ACTIVE); + spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); spi::Result result(_spi.setActiveState(bucket, newState)); if (checkForError(result, *tracker)) { - StorBucketDatabase::WrappedEntry entry(_env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get( - cmd.getBucketId(), "handleSetBucketState")); + StorBucketDatabase::WrappedEntry + entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState"); if (entry.exist()) { entry->info.setActive(newState == spi::BucketInfo::ACTIVE); - notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), - cmd.getSourceIndex(), - entry->info); + notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); entry.write(); } else { LOG(warning, "Got OK setCurrentState result from provider for %s, " @@ -703,8 +651,7 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) cmd.getBucketId().toString().c_str()); } - tracker->setReply(api::StorageReply::SP( - new api::SetBucketStateReply(cmd))); + tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); } return tracker; @@ -713,17 +660,13 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) MessageTracker::UP PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.internalJoin, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock()); document::Bucket destBucket = cmd.getBucket(); { // Create empty bucket for target. StorBucketDatabase::WrappedEntry entry = _env.getBucketDatabase(destBucket.getBucketSpace()).get( - destBucket.getBucketId(), - "join", - StorBucketDatabase::CREATE_IF_NONEXISTING); + destBucket.getBucketId(), "join", StorBucketDatabase::CREATE_IF_NONEXISTING); entry->disk = _env._partition; entry.write(); @@ -734,10 +677,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd) spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())), _context); if (checkForError(result, *tracker)) { - tracker->setReply( - api::StorageReply::SP( - new InternalBucketJoinReply(cmd, - _env.getBucketInfo(cmd.getBucket())))); + tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket()))); } return tracker; } @@ -745,17 +685,14 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd) MessageTracker::UP PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.recheckBucketInfo, _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.recheckBucketInfo, _env._component.getClock()); document::Bucket bucket(cmd.getBucket()); api::BucketInfo info(_env.getBucketInfo(bucket)); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); { // Update bucket database StorBucketDatabase::WrappedEntry entry( - _component->getBucketDatabase(bucket.getBucketSpace()).get( - bucket.getBucketId(), - "handleRecheckBucketInfo")); + _component->getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "handleRecheckBucketInfo")); if (entry.exist()) { api::BucketInfo prevInfo(entry->getBucketInfo()); @@ -799,30 +736,23 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg) return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg)); // Depends on iterators case api::MessageType::STATBUCKET_ID: - return _processAllHandler.handleStatBucket( - static_cast<api::StatBucketCommand&>(msg), _context); + return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), _context); case api::MessageType::REMOVELOCATION_ID: - return _processAllHandler.handleRemoveLocation( - static_cast<api::RemoveLocationCommand&>(msg), _context); + return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), _context); case api::MessageType::MERGEBUCKET_ID: - return _mergeHandler.handleMergeBucket( - static_cast<api::MergeBucketCommand&>(msg), _context); + return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), _context); case api::MessageType::GETBUCKETDIFF_ID: - return _mergeHandler.handleGetBucketDiff( - static_cast<api::GetBucketDiffCommand&>(msg), _context); + return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), _context); case api::MessageType::APPLYBUCKETDIFF_ID: - return _mergeHandler.handleApplyBucketDiff( - static_cast<api::ApplyBucketDiffCommand&>(msg), _context); + return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), _context); case api::MessageType::SETBUCKETSTATE_ID: - return handleSetBucketState( - static_cast<api::SetBucketStateCommand&>(msg)); + return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: return handleGetIter(static_cast<GetIterCommand&>(msg)); case CreateIteratorCommand::ID: - return handleCreateIterator( - static_cast<CreateIteratorCommand&>(msg)); + return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg)); case ReadBucketList::ID: return handleReadBucketList(static_cast<ReadBucketList&>(msg)); case ReadBucketInfo::ID: @@ -830,18 +760,13 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg) case RepairBucketCommand::ID: return handleRepairBucket(static_cast<RepairBucketCommand&>(msg)); case BucketDiskMoveCommand::ID: - return _diskMoveHandler.handleBucketDiskMove( - static_cast<BucketDiskMoveCommand&>(msg), _context); + return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), _context); case InternalBucketJoinCommand::ID: - return handleInternalBucketJoin( - static_cast<InternalBucketJoinCommand&>(msg)); + return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg)); case RecheckBucketInfoCommand::ID: - return handleRecheckBucketInfo( - static_cast<RecheckBucketInfoCommand&>(msg)); + return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg)); default: - LOG(warning, - "Persistence thread received unhandled internal command %s", - msg.toString().c_str()); + LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str()); break; } default: @@ -853,13 +778,11 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg) MessageTracker::UP PersistenceThread::handleCommand(api::StorageCommand& msg) { - _context = spi::Context(msg.getLoadType(), msg.getPriority(), - msg.getTrace().getLevel()); + _context = spi::Context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel()); MessageTracker::UP mtracker(handleCommandSplitByType(msg)); if (mtracker.get() != 0) { if (mtracker->getReply().get() != 0) { - mtracker->getReply()->getTrace().getRoot().addChild( - _context.getTrace().getRoot()); + mtracker->getReply()->getTrace().getRoot().addChild(_context.getTrace().getRoot()); } else { msg.getTrace().getRoot().addChild(_context.getTrace().getRoot()); } @@ -872,14 +795,10 @@ PersistenceThread::handleReply(api::StorageReply& reply) { switch (reply.getType().getId()) { case api::MessageType::GETBUCKETDIFF_REPLY_ID: - _mergeHandler.handleGetBucketDiffReply( - static_cast<api::GetBucketDiffReply&>(reply), - _env._fileStorHandler); + _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler); break; case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: - _mergeHandler.handleApplyBucketDiffReply( - static_cast<api::ApplyBucketDiffReply&>(reply), - _env._fileStorHandler); + _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler); break; default: break; @@ -889,8 +808,7 @@ PersistenceThread::handleReply(api::StorageReply& reply) MessageTracker::UP PersistenceThread::processMessage(api::StorageMessage& msg) { - MBUS_TRACE(msg.getTrace(), 5, - "PersistenceThread: Processing message in persistence layer"); + MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer"); ++_env._metrics.operations; if (msg.getType().isReply()) { @@ -903,8 +821,7 @@ PersistenceThread::processMessage(api::StorageMessage& msg) LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); } } else { - api::StorageCommand& initiatingCommand = - static_cast<api::StorageCommand&>(msg); + api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg); try { int64_t startTime(_component->getClock().getTimeInMillis().getTime()); @@ -913,8 +830,7 @@ PersistenceThread::processMessage(api::StorageMessage& msg) LOG(spam, "Message content: %s", msg.toString(true).c_str()); auto tracker(handleCommand(initiatingCommand)); if (!tracker.get()) { - LOG(debug, "Received unsupported command %s", - msg.getType().getName().c_str()); + LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); } else { tracker->generateReply(initiatingCommand); if ((tracker->getReply().get() @@ -925,29 +841,21 @@ PersistenceThread::processMessage(api::StorageMessage& msg) } } - int64_t stopTime( - _component->getClock().getTimeInMillis().getTime()); + int64_t stopTime(_component->getClock().getTimeInMillis().getTime()); if (stopTime - startTime >= _warnOnSlowOperations) { LOGBT(warning, msg.getType().toString(), - "Slow processing of message %s on disk %u. " - "Processing time: %" PRId64 " ms (>=%d ms)", - msg.toString().c_str(), _env._partition, - stopTime - startTime, _warnOnSlowOperations); + "Slow processing of message %s on disk %u. Processing time: %" PRId64 " ms (>=%d ms)", + msg.toString().c_str(), _env._partition, stopTime - startTime, _warnOnSlowOperations); } else { - LOGBT(spam, msg.getType().toString(), - "Processing time of message %s on disk %u: %" PRId64 " ms", - msg.toString(true).c_str(), _env._partition, - stopTime - startTime); + LOGBT(spam, msg.getType().toString(), "Processing time of message %s on disk %u: %" PRId64 " ms", + msg.toString(true).c_str(), _env._partition, stopTime - startTime); } return tracker; } catch (std::exception& e) { - LOG(debug, "Caught exception for %s: %s", - msg.toString().c_str(), - e.what()); + LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); api::StorageReply::SP reply(initiatingCommand.makeReply().release()); - reply->setResult(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, e.what())); + reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); _env._fileStorHandler.sendReply(reply); } } @@ -1016,8 +924,7 @@ PersistenceThread::flushAllReplies( uint32_t errorCode = _env.convertErrorCode(result); if (errorCode != 0) { for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult( - api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage())); + replies[i]->getReply()->setResult(api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage())); } } } catch (std::exception& e) { @@ -1041,8 +948,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) document::Bucket bucket = lock.first->getBucket(); while (lock.second) { - LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", - _env._partition, _env._nodeIndex, lock.second.get()); + LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); std::shared_ptr<api::StorageMessage> msg(lock.second); bool batchable = isBatchable(*msg); @@ -1077,7 +983,6 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } else { LOG(spam, "Sending reply up: %s %zu", tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); - _env._fileStorHandler.sendReply(tracker->getReply()); break; } |