summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-03-16 18:39:07 +0100
committerHenning Baldersheim <balder@oath.com>2018-03-19 13:01:17 +0100
commit3790a0da41ddd043c5a4e561a1166b6a9bcceaaf (patch)
treee14d9f35030f6de2c7771304682b3ba987a62234
parentb5de45250c79c35635b67ecfccdc181a910ded20 (diff)
std::make_ instead of 'new'.
Conflicts: storage/src/vespa/storage/persistence/persistencethread.cpp Resolve merge conflict with removed code.
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp159
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp163
3 files changed, 95 insertions, 239 deletions
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index 8636628c7cc..b53c8c270f2 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -14,21 +14,16 @@ using namespace storage::api;
namespace storage {
-StorageLink::~StorageLink()
-{
-}
+StorageLink::~StorageLink() = default;
void StorageLink::push_back(StorageLink::UP link)
{
if (_state != CREATED) {
- LOG(error, "Attempted to alter chain by adding link %s after link %s "
- "while state is %s",
- link->toString().c_str(),
- toString().c_str(),
- stateToString(_state));
+ LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s",
+ link->toString().c_str(), toString().c_str(), stateToString(_state));
assert(false);
}
- assert(link.get());
+ assert(link);
if (isBottom()) {
link->_up = this;
_down = std::move(link);
@@ -39,26 +34,24 @@ void StorageLink::push_back(StorageLink::UP link)
void StorageLink::open()
{
- // First tag all states as opened, as components are allowed to send
- // messages both ways in onOpen call, in case any component send message
- // up, the link receiving them should have their state as opened.
+ // First tag all states as opened, as components are allowed to send
+ // messages both ways in onOpen call, in case any component send message
+ // up, the link receiving them should have their state as opened.
StorageLink* link = this;
while (true) {
if (link->_state != CREATED) {
- LOG(error, "During open(), link %s should be in CREATED state, "
- "not in state %s.",
- toString().c_str(),
- stateToString(link->_state));
+ LOG(error, "During open(), link %s should be in CREATED state, not in state %s.",
+ toString().c_str(), stateToString(link->_state));
assert(false);
}
link->_state = OPENED;
if (link->_down.get() == 0) break;
link = link->_down.get();
}
- // When give all links an onOpen call, bottoms up. Do it bottoms up, as
- // links are more likely to send messages down in their onOpen() call
- // than up. Thus, chances are best that the component is ready to
- // receive messages sent during onOpen().
+ // When give all links an onOpen call, bottoms up. Do it bottoms up, as
+ // links are more likely to send messages down in their onOpen() call
+ // than up. Thus, chances are best that the component is ready to
+ // receive messages sent during onOpen().
while (link != 0) {
link->onOpen();
link = link->_up;
@@ -91,34 +84,31 @@ void StorageLink::closeNextLink() {
void StorageLink::flush()
{
if (_state != CLOSING) {
- LOG(error, "During flush(), link %s should be in CLOSING state, "
- "not in state %s.",
- toString().c_str(),
- stateToString(_state));
+ LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.",
+ toString().c_str(), stateToString(_state));
assert(false);
}
- // First flush down to get all requests out of the system.
+ // First flush down to get all requests out of the system.
_state = FLUSHINGDOWN;
LOG(debug, "Flushing link %s on the way down.", toString().c_str());
onFlush(true);
LOG(debug, "Flushed link %s on the way down.", toString().c_str());
if (!isBottom()) {
_down->flush();
- // Then flush up to get replies out of the system
+ // Then flush up to get replies out of the system
LOG(debug, "Flushing link %s on the way back up.", toString().c_str());
_state = FLUSHINGUP;
onFlush(false);
LOG(debug, "Flushed link %s on the way back up.", toString().c_str());
} else {
- // Then flush up to get replies out of the system
+ // Then flush up to get replies out of the system
LOG(debug, "Flushing link %s on the way back up.", toString().c_str());
_state = FLUSHINGUP;
onFlush(false);
LOG(debug, "Flushed link %s on the way back up.", toString().c_str());
}
_state = CLOSED;
- LOG(debug, "Link %s is now closed and should do nothing more.",
- toString().c_str());
+ LOG(debug, "Link %s is now closed and should do nothing more.", toString().c_str());
}
void StorageLink::sendDown(const StorageMessage::SP& msg)
@@ -130,58 +120,31 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
case FLUSHINGDOWN:
break;
default:
- LOG(error,
- "Link %s trying to send %s down while in state %s",
- toString().c_str(),
- msg->toString().c_str(),
- stateToString(_state));
+ LOG(error, "Link %s trying to send %s down while in state %s",
+ toString().c_str(), msg->toString().c_str(), stateToString(_state));
assert(false);
}
assert(msg.get());
- LOG(spam, "Storage Link %s to handle %s",
- toString().c_str(), msg->toString().c_str());
+ LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str());
if (isBottom()) {
- LOG(spam, "Storage link %s at bottom of chain got message %s.",
- toString().c_str(), msg->toString().c_str());
- /*
- if (isFlush(msg)) {
+ LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str());
+ ostringstream ost;
+ ost << "Unhandled message at bottom of chain " << *msg << " (message type "
+ << msg->getType().getName() << "). " << vespalib::getStackTrace(0);
+ if (!msg->getType().isReply()) {
+ LOGBP(warning, "%s", ost.str().c_str());
StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
shared_ptr<StorageReply> reply(cmd.makeReply().release());
if (reply.get()) {
+ reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName()));
sendUp(reply);
}
} else {
- */
- ostringstream ost;
- ost << "Unhandled message at bottom of chain "
- << *msg << " (message type "
- << msg->getType().getName()
- << "). "
- << vespalib::getStackTrace(0);
- if (!msg->getType().isReply()) {
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
- shared_ptr<StorageReply> reply(cmd.makeReply().release());
-
- if (reply.get()) {
- reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED,
- msg->getType().getName()));
- sendUp(reply);
- }
- } else {
- ost << " Return code: "
- << static_cast<StorageReply&>(*msg).getResult();
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- }
- //}
+ ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult();
+ LOGBP(warning, "%s", ost.str().c_str());
+ }
} else if (!_down->onDown(msg)) {
- //LOG(spam, "Storage link %s forwarding message %s.",
- // toString().c_str(), msg->toString().c_str());
_down->sendDown(msg);
} else {
LOG(spam, "Storage link %s handled message %s.",
@@ -191,7 +154,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
{
- // Verify acceptable state to send messages up
+ // Verify acceptable state to send messages up
switch(_state) {
case OPENED:
case CLOSING:
@@ -199,48 +162,28 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
case FLUSHINGUP:
break;
default:
- LOG(error,
- "Link %s trying to send %s up while in state %s",
- toString().c_str(),
- msg->toString(true).c_str(),
- stateToString(_state));
+ LOG(error, "Link %s trying to send %s up while in state %s",
+ toString().c_str(), msg->toString(true).c_str(), stateToString(_state));
assert(false);
}
assert(msg.get());
if (isTop()) {
- /*
- if (isFlush(msg)) {
+ ostringstream ost;
+ ost << "Unhandled message at top of chain " << *msg << ".";
+ ost << vespalib::getStackTrace(0);
+ if (!msg->getType().isReply()) {
+ LOGBP(warning, "%s", ost.str().c_str());
StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
shared_ptr<StorageReply> reply(cmd.makeReply().release());
if (reply.get()) {
+ reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName()));
sendDown(reply);
}
} else {
- */
- ostringstream ost;
- ost << "Unhandled message at top of chain " << *msg << ".";
- ost << vespalib::getStackTrace(0);
- if (!msg->getType().isReply()) {
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- StorageCommand& cmd = static_cast<StorageCommand&>(*msg);
- shared_ptr<StorageReply> reply(cmd.makeReply().release());
-
- if (reply.get()) {
- reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED,
- msg->getType().getName()));
- sendDown(reply);
- }
- } else {
- ost << " Return code: "
- << static_cast<StorageReply&>(*msg).getResult();
- //if (!_closed) {
- LOGBP(warning, "%s", ost.str().c_str());
- //}
- }
- //}
+ ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult();
+ LOGBP(warning, "%s", ost.str().c_str());
+ }
} else if (!_up->onUp(msg)) {
_up->sendUp(msg);
}
@@ -261,19 +204,7 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const {
bool StorageLink::onDown(const shared_ptr<StorageMessage> & msg)
{
- //LOG(spam, "Checking if storage link %s handles %s.",
- // toString().c_str(), msg->toString().c_str());
- bool result = msg->callHandler(*this, msg);
- /*
- if (result) {
- LOG(spam, "Storage link %s handled message %s.",
- toString().c_str(), msg->toString().c_str());
- } else {
- LOG(spam, "Storage link %s did not handle message %s.",
- toString().c_str(), msg->toString().c_str());
- }
- */
- return result;
+ return msg->callHandler(*this, msg);
}
bool StorageLink::onUp(const shared_ptr<StorageMessage> & msg)
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 75f389815df..5eb168a9a42 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -559,22 +559,16 @@ FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk)
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
- LOG(spam,
- "Acquiring filestor lock for %s on disk %d",
- bucket.getBucketId().toString().c_str(),
- disk);
+ LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk);
vespalib::MonitorGuard lockGuard(t.lock);
while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) {
- LOG(spam,
- "Contending for filestor lock for %s",
- bucket.getBucketId().toString().c_str());
+ LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str());
lockGuard.wait(100);
}
- std::shared_ptr<FileStorHandler::BucketLockInterface> locker(
- new BucketLock(lockGuard, t, bucket, 255, "External lock"));
+ auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock");
lockGuard.broadcast();
return locker;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index fded9c87978..be6f9577642 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -9,7 +9,6 @@
#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/exceptions.h>
-#include <algorithm>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".persistence.thread");
@@ -105,19 +104,14 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd,
MessageTracker::UP
PersistenceThread::handlePut(api::PutCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.put[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.put[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::Result response =
- _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getDocument(),
- _context);
+ spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context);
checkForError(response, *tracker);
return tracker;
}
@@ -125,18 +119,14 @@ PersistenceThread::handlePut(api::PutCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRemove(api::RemoveCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.remove[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.remove[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::RemoveResult response =
- _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getDocumentId(), _context);
+ spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context);
if (checkForError(response, *tracker)) {
tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
@@ -149,18 +139,14 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd)
MessageTracker::UP
PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.update[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.update[cmd.getLoadType()],_env._component.getClock());
if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
return tracker;
}
- spi::UpdateResult response =
- _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()),
- cmd.getUpdate(), _context);
+ spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
+ spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context);
if (checkForError(response, *tracker)) {
auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
@@ -172,30 +158,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
MessageTracker::UP
PersistenceThread::handleGet(api::GetCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.get[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.get[cmd.getLoadType()],_env._component.getClock());
document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(),
- cmd.getFieldSet());
+ document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
spi::GetResult result =
- _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- *fieldSet,
- cmd.getDocumentId(),
- _context);
+ _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context);
if (checkForError(result, *tracker)) {
if (!result.hasDocument()) {
++_env._metrics.get[cmd.getLoadType()].notFound;
}
-
- api::GetReply::UP reply(
- new api::GetReply(cmd,
- Document::SP(result.getDocumentPtr()),
- result.getTimestamp()));
-
- tracker->setReply(api::StorageReply::SP(reply.release()));
+ tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp()));
}
return tracker;
@@ -204,19 +178,14 @@ PersistenceThread::handleGet(api::GetCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.repairs,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock());
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
- LOG(debug, "Repair(%s): %s",
- cmd.getBucketId().toString().c_str(),
+ LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(),
(cmd.verifyBody() ? "Verifying body" : "Not verifying body"));
api::BucketInfo before = _env.getBucketInfo(cmd.getBucket());
spi::Result result =
- _spi.maintain(spi::Bucket(cmd.getBucket(),
- spi::PartitionId(_env._partition)),
- cmd.verifyBody() ?
- spi::HIGH : spi::LOW);
+ _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
+ cmd.verifyBody() ? spi::HIGH : spi::LOW);
if (checkForError(result, *tracker)) {
api::BucketInfo after = _env.getBucketInfo(cmd.getBucket());
@@ -236,9 +205,7 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRevert(api::RevertCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.revert[cmd.getLoadType()],
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock());
spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
for (const api::Timestamp & token : tokens) {
@@ -250,9 +217,7 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd)
MessageTracker::UP
PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.createBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock());
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
LOG(warning, "Bucket %s was merging at create time. Unexpected.",
@@ -294,8 +259,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
result.getErrorMessage().c_str());
return false;
}
- api::BucketInfo providerInfo(
- _env.convertBucketInfo(result.getBucketInfo()));
+ api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo()));
// Don't check meta fields or active/ready fields since these are not
// that important and ready may change under the hood in a race with
// getModifiedBuckets(). If bucket is empty it means it has already
@@ -318,15 +282,12 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
MessageTracker::UP
PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.deleteBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock());
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
_env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED,
- "Bucket was deleted during the merge"));
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
}
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
@@ -335,8 +296,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
_spi.deleteBucket(bucket, _context);
StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
{
- StorBucketDatabase::WrappedEntry entry(db.get(
- cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
+ StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
if (entry.exist() && entry->getMetaCount() > 0) {
LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
"active operation when delete bucket was queued. "
@@ -360,11 +320,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
MessageTracker::UP
PersistenceThread::handleGetIter(GetIterCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.visit[cmd.getLoadType()],
- _env._component.getClock()));
- spi::IterateResult result(_spi.iterate(cmd.getIteratorId(),
- cmd.getMaxByteSize(), _context));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock());
+ spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context));
if (checkForError(result, *tracker)) {
GetIterReply::SP reply(new GetIterReply(cmd));
reply->getEntries() = result.steal_entries();
@@ -381,13 +338,11 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd)
MessageTracker::UP
PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.readBucketList,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock());
spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition()));
if (checkForError(result, *tracker)) {
- ReadBucketListReply::SP reply(new ReadBucketListReply(cmd));
+ auto reply = std::make_shared<ReadBucketListReply>(cmd);
result.getList().swap(reply->getBuckets());
tracker->setReply(reply);
}
@@ -398,32 +353,22 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
MessageTracker::UP
PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.readBucketInfo,
- _env._component.getClock()));
-
- _env.updateBucketDatabase(cmd.getBucket(),
- _env.getBucketInfo(cmd.getBucket()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock());
+ _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket()));
return tracker;
}
MessageTracker::UP
PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.createIterator,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock());
document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(),
- cmd.getFields());
+ document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields());
// _context is reset per command, so it's safe to modify it like this.
_context.setReadConsistency(cmd.getReadConsistency());
spi::CreateIteratorResult result(_spi.createIterator(
spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
- *fieldSet,
- cmd.getSelection(),
- cmd.getIncludedVersions(),
- _context));
+ *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context));
if (checkForError(result, *tracker)) {
tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
}
@@ -440,10 +385,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// Calculate the various bucket ids involved.
if (cmd.getBucketId().getUsedBits() >= 58) {
- tracker->fail(
- api::ReturnCode::ILLEGAL_PARAMETERS,
- "Can't split anymore since maximum split bits "
- "is already reached");
+ tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
+ "Can't split anymore since maximum split bits is already reached");
return tracker;
}
if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) {
@@ -463,13 +406,11 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
document::BucketId src(cmd.getBucketId());
document::BucketId target1(src.getUsedBits() + 1, src.getId());
- document::BucketId target2(src.getUsedBits() + 1, src.getId()
- | (uint64_t(1) << src.getUsedBits()));
+ document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits()));
targetInfo = SplitBitDetector::Result(target1, target2, false);
}
if (targetInfo.failed()) {
- tracker->fail(api::ReturnCode::INTERNAL_FAILURE,
- targetInfo.getReason());
+ tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason());
return tracker;
}
// If we get here, we're splitting data in two.
@@ -1069,23 +1010,18 @@ PersistenceThread::flushAllReplies(
if (errorCode != 0) {
for (uint32_t i = 0; i < replies.size(); ++i) {
replies[i]->getReply()->setResult(
- api::ReturnCode(
- (api::ReturnCode::Result)errorCode,
- result.getErrorMessage()));
+ api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage()));
}
}
} catch (std::exception& e) {
for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, e.what()));
+ replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
}
}
for (uint32_t i = 0; i < replies.size(); ++i) {
- LOG(spam,
- "Sending reply up (batched): %s %zu",
- replies[i]->getReply()->toString().c_str(),
- replies[i]->getReply()->getMsgId());
+ LOG(spam, "Sending reply up (batched): %s %zu",
+ replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId());
_env._fileStorHandler.sendReply(replies[i]->getReply());
}
@@ -1097,7 +1033,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
std::vector<MessageTracker::UP> trackers;
document::Bucket bucket = lock.first->getBucket();
- while (lock.second.get() != 0) {
+ while (lock.second) {
LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p",
_env._partition, _env._nodeIndex, lock.second.get());
std::shared_ptr<api::StorageMessage> msg(lock.second);
@@ -1110,7 +1046,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
}
std::unique_ptr<MessageTracker> tracker = processMessage(*msg);
- if (!tracker.get() || !tracker->getReply().get()) {
+ if (!tracker || !tracker->getReply()) {
// Was a reply
break;
}
@@ -1122,8 +1058,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
}
if (batchable) {
LOG(spam, "Adding reply %s to batch for bucket %s",
- tracker->getReply()->toString().c_str(),
- bucket.getBucketId().toString().c_str());
+ tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str());
trackers.push_back(std::move(tracker));
@@ -1133,10 +1068,8 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
break;
}
} else {
- LOG(spam,
- "Sending reply up: %s %zu",
- tracker->getReply()->toString().c_str(),
- tracker->getReply()->getMsgId());
+ LOG(spam, "Sending reply up: %s %zu",
+ tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
_env._fileStorHandler.sendReply(tracker->getReply());
break;
@@ -1151,14 +1084,12 @@ PersistenceThread::run(framework::ThreadHandle& thread)
{
LOG(debug, "Started persistence thread with pid %d", getpid());
- while (!thread.interrupted()
- && !_env._fileStorHandler.closed(_env._partition))
- {
+ while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) {
thread.registerTick();
FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition));
- if (lock.first.get()) {
+ if (lock.first) {
processMessages(lock);
}