diff options
author | Henning Baldersheim <balder@oath.com> | 2018-03-16 18:39:07 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-03-19 13:01:17 +0100 |
commit | 3790a0da41ddd043c5a4e561a1166b6a9bcceaaf (patch) | |
tree | e14d9f35030f6de2c7771304682b3ba987a62234 | |
parent | b5de45250c79c35635b67ecfccdc181a910ded20 (diff) |
std::make_ instead of 'new'.
Conflicts:
storage/src/vespa/storage/persistence/persistencethread.cpp
Resolve merge conflict with removed code.
3 files changed, 95 insertions, 239 deletions
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 8636628c7cc..b53c8c270f2 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -14,21 +14,16 @@ using namespace storage::api; namespace storage { -StorageLink::~StorageLink() -{ -} +StorageLink::~StorageLink() = default; void StorageLink::push_back(StorageLink::UP link) { if (_state != CREATED) { - LOG(error, "Attempted to alter chain by adding link %s after link %s " - "while state is %s", - link->toString().c_str(), - toString().c_str(), - stateToString(_state)); + LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s", + link->toString().c_str(), toString().c_str(), stateToString(_state)); assert(false); } - assert(link.get()); + assert(link); if (isBottom()) { link->_up = this; _down = std::move(link); @@ -39,26 +34,24 @@ void StorageLink::push_back(StorageLink::UP link) void StorageLink::open() { - // First tag all states as opened, as components are allowed to send - // messages both ways in onOpen call, in case any component send message - // up, the link receiving them should have their state as opened. + // First tag all states as opened, as components are allowed to send + // messages both ways in onOpen call, in case any component send message + // up, the link receiving them should have their state as opened. StorageLink* link = this; while (true) { if (link->_state != CREATED) { - LOG(error, "During open(), link %s should be in CREATED state, " - "not in state %s.", - toString().c_str(), - stateToString(link->_state)); + LOG(error, "During open(), link %s should be in CREATED state, not in state %s.", + toString().c_str(), stateToString(link->_state)); assert(false); } link->_state = OPENED; if (link->_down.get() == 0) break; link = link->_down.get(); } - // When give all links an onOpen call, bottoms up. Do it bottoms up, as - // links are more likely to send messages down in their onOpen() call - // than up. Thus, chances are best that the component is ready to - // receive messages sent during onOpen(). + // When give all links an onOpen call, bottoms up. Do it bottoms up, as + // links are more likely to send messages down in their onOpen() call + // than up. Thus, chances are best that the component is ready to + // receive messages sent during onOpen(). while (link != 0) { link->onOpen(); link = link->_up; @@ -91,34 +84,31 @@ void StorageLink::closeNextLink() { void StorageLink::flush() { if (_state != CLOSING) { - LOG(error, "During flush(), link %s should be in CLOSING state, " - "not in state %s.", - toString().c_str(), - stateToString(_state)); + LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.", + toString().c_str(), stateToString(_state)); assert(false); } - // First flush down to get all requests out of the system. + // First flush down to get all requests out of the system. _state = FLUSHINGDOWN; LOG(debug, "Flushing link %s on the way down.", toString().c_str()); onFlush(true); LOG(debug, "Flushed link %s on the way down.", toString().c_str()); if (!isBottom()) { _down->flush(); - // Then flush up to get replies out of the system + // Then flush up to get replies out of the system LOG(debug, "Flushing link %s on the way back up.", toString().c_str()); _state = FLUSHINGUP; onFlush(false); LOG(debug, "Flushed link %s on the way back up.", toString().c_str()); } else { - // Then flush up to get replies out of the system + // Then flush up to get replies out of the system LOG(debug, "Flushing link %s on the way back up.", toString().c_str()); _state = FLUSHINGUP; onFlush(false); LOG(debug, "Flushed link %s on the way back up.", toString().c_str()); } _state = CLOSED; - LOG(debug, "Link %s is now closed and should do nothing more.", - toString().c_str()); + LOG(debug, "Link %s is now closed and should do nothing more.", toString().c_str()); } void StorageLink::sendDown(const StorageMessage::SP& msg) @@ -130,58 +120,31 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) case FLUSHINGDOWN: break; default: - LOG(error, - "Link %s trying to send %s down while in state %s", - toString().c_str(), - msg->toString().c_str(), - stateToString(_state)); + LOG(error, "Link %s trying to send %s down while in state %s", + toString().c_str(), msg->toString().c_str(), stateToString(_state)); assert(false); } assert(msg.get()); - LOG(spam, "Storage Link %s to handle %s", - toString().c_str(), msg->toString().c_str()); + LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { - LOG(spam, "Storage link %s at bottom of chain got message %s.", - toString().c_str(), msg->toString().c_str()); - /* - if (isFlush(msg)) { + LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); + ostringstream ost; + ost << "Unhandled message at bottom of chain " << *msg << " (message type " + << msg->getType().getName() << "). " << vespalib::getStackTrace(0); + if (!msg->getType().isReply()) { + LOGBP(warning, "%s", ost.str().c_str()); StorageCommand& cmd = static_cast<StorageCommand&>(*msg); shared_ptr<StorageReply> reply(cmd.makeReply().release()); if (reply.get()) { + reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendUp(reply); } } else { - */ - ostringstream ost; - ost << "Unhandled message at bottom of chain " - << *msg << " (message type " - << msg->getType().getName() - << "). " - << vespalib::getStackTrace(0); - if (!msg->getType().isReply()) { - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); - - if (reply.get()) { - reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, - msg->getType().getName())); - sendUp(reply); - } - } else { - ost << " Return code: " - << static_cast<StorageReply&>(*msg).getResult(); - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - } - //} + ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult(); + LOGBP(warning, "%s", ost.str().c_str()); + } } else if (!_down->onDown(msg)) { - //LOG(spam, "Storage link %s forwarding message %s.", - // toString().c_str(), msg->toString().c_str()); _down->sendDown(msg); } else { LOG(spam, "Storage link %s handled message %s.", @@ -191,7 +154,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) { - // Verify acceptable state to send messages up + // Verify acceptable state to send messages up switch(_state) { case OPENED: case CLOSING: @@ -199,48 +162,28 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) case FLUSHINGUP: break; default: - LOG(error, - "Link %s trying to send %s up while in state %s", - toString().c_str(), - msg->toString(true).c_str(), - stateToString(_state)); + LOG(error, "Link %s trying to send %s up while in state %s", + toString().c_str(), msg->toString(true).c_str(), stateToString(_state)); assert(false); } assert(msg.get()); if (isTop()) { - /* - if (isFlush(msg)) { + ostringstream ost; + ost << "Unhandled message at top of chain " << *msg << "."; + ost << vespalib::getStackTrace(0); + if (!msg->getType().isReply()) { + LOGBP(warning, "%s", ost.str().c_str()); StorageCommand& cmd = static_cast<StorageCommand&>(*msg); shared_ptr<StorageReply> reply(cmd.makeReply().release()); if (reply.get()) { + reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendDown(reply); } } else { - */ - ostringstream ost; - ost << "Unhandled message at top of chain " << *msg << "."; - ost << vespalib::getStackTrace(0); - if (!msg->getType().isReply()) { - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); - - if (reply.get()) { - reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, - msg->getType().getName())); - sendDown(reply); - } - } else { - ost << " Return code: " - << static_cast<StorageReply&>(*msg).getResult(); - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - } - //} + ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult(); + LOGBP(warning, "%s", ost.str().c_str()); + } } else if (!_up->onUp(msg)) { _up->sendUp(msg); } @@ -261,19 +204,7 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const { bool StorageLink::onDown(const shared_ptr<StorageMessage> & msg) { - //LOG(spam, "Checking if storage link %s handles %s.", - // toString().c_str(), msg->toString().c_str()); - bool result = msg->callHandler(*this, msg); - /* - if (result) { - LOG(spam, "Storage link %s handled message %s.", - toString().c_str(), msg->toString().c_str()); - } else { - LOG(spam, "Storage link %s did not handle message %s.", - toString().c_str(), msg->toString().c_str()); - } - */ - return result; + return msg->callHandler(*this, msg); } bool StorageLink::onUp(const shared_ptr<StorageMessage> & msg) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 75f389815df..5eb168a9a42 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -559,22 +559,16 @@ FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk) assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); - LOG(spam, - "Acquiring filestor lock for %s on disk %d", - bucket.getBucketId().toString().c_str(), - disk); + LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk); vespalib::MonitorGuard lockGuard(t.lock); while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) { - LOG(spam, - "Contending for filestor lock for %s", - bucket.getBucketId().toString().c_str()); + LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str()); lockGuard.wait(100); } - std::shared_ptr<FileStorHandler::BucketLockInterface> locker( - new BucketLock(lockGuard, t, bucket, 255, "External lock")); + auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock"); lockGuard.broadcast(); return locker; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index fded9c87978..be6f9577642 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -9,7 +9,6 @@ #include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> -#include <algorithm> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.thread"); @@ -105,19 +104,14 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker::UP PersistenceThread::handlePut(api::PutCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.put[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.put[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::Result response = - _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getDocument(), - _context); + spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context); checkForError(response, *tracker); return tracker; } @@ -125,18 +119,14 @@ PersistenceThread::handlePut(api::PutCommand& cmd) MessageTracker::UP PersistenceThread::handleRemove(api::RemoveCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.remove[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.remove[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::RemoveResult response = - _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getDocumentId(), _context); + spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context); if (checkForError(response, *tracker)) { tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } @@ -149,18 +139,14 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd) MessageTracker::UP PersistenceThread::handleUpdate(api::UpdateCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.update[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.update[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::UpdateResult response = - _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getUpdate(), _context); + spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context); if (checkForError(response, *tracker)) { auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); @@ -172,30 +158,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd) MessageTracker::UP PersistenceThread::handleGet(api::GetCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.get[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.get[cmd.getLoadType()],_env._component.getClock()); document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), - cmd.getFieldSet()); + document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); spi::GetResult result = - _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), - *fieldSet, - cmd.getDocumentId(), - _context); + _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context); if (checkForError(result, *tracker)) { if (!result.hasDocument()) { ++_env._metrics.get[cmd.getLoadType()].notFound; } - - api::GetReply::UP reply( - new api::GetReply(cmd, - Document::SP(result.getDocumentPtr()), - result.getTimestamp())); - - tracker->setReply(api::StorageReply::SP(reply.release())); + tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp())); } return tracker; @@ -204,19 +178,14 @@ PersistenceThread::handleGet(api::GetCommand& cmd) MessageTracker::UP PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.repairs, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock()); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); - LOG(debug, "Repair(%s): %s", - cmd.getBucketId().toString().c_str(), + LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(), (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); api::BucketInfo before = _env.getBucketInfo(cmd.getBucket()); spi::Result result = - _spi.maintain(spi::Bucket(cmd.getBucket(), - spi::PartitionId(_env._partition)), - cmd.verifyBody() ? - spi::HIGH : spi::LOW); + _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), + cmd.verifyBody() ? spi::HIGH : spi::LOW); if (checkForError(result, *tracker)) { api::BucketInfo after = _env.getBucketInfo(cmd.getBucket()); @@ -236,9 +205,7 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) MessageTracker::UP PersistenceThread::handleRevert(api::RevertCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.revert[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock()); spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); for (const api::Timestamp & token : tokens) { @@ -250,9 +217,7 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd) MessageTracker::UP PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.createBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock()); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", @@ -294,8 +259,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, result.getErrorMessage().c_str()); return false; } - api::BucketInfo providerInfo( - _env.convertBucketInfo(result.getBucketInfo())); + api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo())); // Don't check meta fields or active/ready fields since these are not // that important and ready may change under the hood in a race with // getModifiedBuckets(). If bucket is empty it means it has already @@ -318,15 +282,12 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, MessageTracker::UP PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.deleteBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock()); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, - "Bucket was deleted during the merge")); + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); } spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { @@ -335,8 +296,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) _spi.deleteBucket(bucket, _context); StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); { - StorBucketDatabase::WrappedEntry entry(db.get( - cmd.getBucketId(), "FileStorThread::onDeleteBucket")); + StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); if (entry.exist() && entry->getMetaCount() > 0) { LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " "active operation when delete bucket was queued. " @@ -360,11 +320,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) MessageTracker::UP PersistenceThread::handleGetIter(GetIterCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.visit[cmd.getLoadType()], - _env._component.getClock())); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), - cmd.getMaxByteSize(), _context)); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock()); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context)); if (checkForError(result, *tracker)) { GetIterReply::SP reply(new GetIterReply(cmd)); reply->getEntries() = result.steal_entries(); @@ -381,13 +338,11 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd) MessageTracker::UP PersistenceThread::handleReadBucketList(ReadBucketList& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.readBucketList, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock()); spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition())); if (checkForError(result, *tracker)) { - ReadBucketListReply::SP reply(new ReadBucketListReply(cmd)); + auto reply = std::make_shared<ReadBucketListReply>(cmd); result.getList().swap(reply->getBuckets()); tracker->setReply(reply); } @@ -398,32 +353,22 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd) MessageTracker::UP PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.readBucketInfo, - _env._component.getClock())); - - _env.updateBucketDatabase(cmd.getBucket(), - _env.getBucketInfo(cmd.getBucket())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock()); + _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket())); return tracker; } MessageTracker::UP PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.createIterator, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock()); document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), - cmd.getFields()); + document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields()); // _context is reset per command, so it's safe to modify it like this. _context.setReadConsistency(cmd.getReadConsistency()); spi::CreateIteratorResult result(_spi.createIterator( spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - *fieldSet, - cmd.getSelection(), - cmd.getIncludedVersions(), - _context)); + *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context)); if (checkForError(result, *tracker)) { tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId()))); } @@ -440,10 +385,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // Calculate the various bucket ids involved. if (cmd.getBucketId().getUsedBits() >= 58) { - tracker->fail( - api::ReturnCode::ILLEGAL_PARAMETERS, - "Can't split anymore since maximum split bits " - "is already reached"); + tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, + "Can't split anymore since maximum split bits is already reached"); return tracker; } if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) { @@ -463,13 +406,11 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { document::BucketId src(cmd.getBucketId()); document::BucketId target1(src.getUsedBits() + 1, src.getId()); - document::BucketId target2(src.getUsedBits() + 1, src.getId() - | (uint64_t(1) << src.getUsedBits())); + document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits())); targetInfo = SplitBitDetector::Result(target1, target2, false); } if (targetInfo.failed()) { - tracker->fail(api::ReturnCode::INTERNAL_FAILURE, - targetInfo.getReason()); + tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason()); return tracker; } // If we get here, we're splitting data in two. @@ -1069,23 +1010,18 @@ PersistenceThread::flushAllReplies( if (errorCode != 0) { for (uint32_t i = 0; i < replies.size(); ++i) { replies[i]->getReply()->setResult( - api::ReturnCode( - (api::ReturnCode::Result)errorCode, - result.getErrorMessage())); + api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage())); } } } catch (std::exception& e) { for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, e.what())); + replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); } } for (uint32_t i = 0; i < replies.size(); ++i) { - LOG(spam, - "Sending reply up (batched): %s %zu", - replies[i]->getReply()->toString().c_str(), - replies[i]->getReply()->getMsgId()); + LOG(spam, "Sending reply up (batched): %s %zu", + replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId()); _env._fileStorHandler.sendReply(replies[i]->getReply()); } @@ -1097,7 +1033,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) std::vector<MessageTracker::UP> trackers; document::Bucket bucket = lock.first->getBucket(); - while (lock.second.get() != 0) { + while (lock.second) { LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); std::shared_ptr<api::StorageMessage> msg(lock.second); @@ -1110,7 +1046,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } std::unique_ptr<MessageTracker> tracker = processMessage(*msg); - if (!tracker.get() || !tracker->getReply().get()) { + if (!tracker || !tracker->getReply()) { // Was a reply break; } @@ -1122,8 +1058,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } if (batchable) { LOG(spam, "Adding reply %s to batch for bucket %s", - tracker->getReply()->toString().c_str(), - bucket.getBucketId().toString().c_str()); + tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str()); trackers.push_back(std::move(tracker)); @@ -1133,10 +1068,8 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) break; } } else { - LOG(spam, - "Sending reply up: %s %zu", - tracker->getReply()->toString().c_str(), - tracker->getReply()->getMsgId()); + LOG(spam, "Sending reply up: %s %zu", + tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); _env._fileStorHandler.sendReply(tracker->getReply()); break; @@ -1151,14 +1084,12 @@ PersistenceThread::run(framework::ThreadHandle& thread) { LOG(debug, "Started persistence thread with pid %d", getpid()); - while (!thread.interrupted() - && !_env._fileStorHandler.closed(_env._partition)) - { + while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) { thread.registerTick(); FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition)); - if (lock.first.get()) { + if (lock.first) { processMessages(lock); } |