summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-06-07 16:47:01 +0200
committerHenning Baldersheim <balder@oath.com>2018-06-12 09:56:59 +0200
commit498bd3df67182d16e021fccd5514c578838ba2bf (patch)
treeb43692b3621746ff2617ff6ba871d8d459ef57e7 /storage
parent89521c8531b0eeb03bd28f899ecd59a9f8b334a6 (diff)
Reduce code visibility
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp1
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp1
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp1
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp3
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h6
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp1
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp255
8 files changed, 90 insertions, 179 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index d42330086f0..d12e6b90b2a 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -5,6 +5,7 @@
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index fea4ae15efc..81b0293b0c0 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/document/test/make_document_bucket.h>
using document::test::makeDocumentBucket;
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index 9de304db497..fd880effda9 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -8,6 +8,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/operations/external/updateoperation.h>
#include <vespa/vespalib/testkit/test_kit.h>
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 396cc0d70de..f15921e447d 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -1,13 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "persistencetestutils.h"
#include <vespa/document/datatype/documenttype.h>
#include <vespa/storageapi/message/persistence.h>
-#include <tests/persistence/persistencetestutils.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/persistence/spi/test.h>
#include <vespa/document/update/assignvalueupdate.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index 22ab954cc6a..34d4c397f09 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -145,11 +145,11 @@ public:
bool headerOnly)
{ return doGetOnDisk(0, bucketId, docId, headerOnly); }
- document::DocumentUpdate::SP createBodyUpdate(
+ std::shared_ptr<document::DocumentUpdate> createBodyUpdate(
const document::DocumentId& id,
const document::FieldValue& updateValue);
- document::DocumentUpdate::SP createHeaderUpdate(
+ std::shared_ptr<document::DocumentUpdate> createHeaderUpdate(
const document::DocumentId& id,
const document::FieldValue& updateValue);
@@ -172,7 +172,7 @@ public:
uint16_t disk = 0);
spi::UpdateResult doUpdate(document::BucketId bid,
- const document::DocumentUpdate::SP& update,
+ const std::shared_ptr<document::DocumentUpdate>& update,
spi::Timestamp time,
uint16_t disk = 0);
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index caf24c4a8fe..c729df1e7eb 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -6,6 +6,7 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
#include <vespa/document/fieldvalue/fieldvalues.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/document/update/assignvalueupdate.h>
#include <vespa/persistence/spi/test.h>
#include <functional>
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 170b5385f72..7df598bed97 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -13,6 +13,7 @@
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/storageserver/changedbucketownershiphandler.h>
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 768e104e02b..0eeb4eeb84d 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -7,6 +7,7 @@
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/exceptions.h>
@@ -192,9 +193,8 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(),
(cmd.verifyBody() ? "Verifying body" : "Not verifying body"));
api::BucketInfo before = _env.getBucketInfo(cmd.getBucket());
- spi::Result result =
- _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
- cmd.verifyBody() ? spi::HIGH : spi::LOW);
+ spi::Result result = _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
+ cmd.verifyBody() ? spi::HIGH : spi::LOW);
if (checkForError(result, *tracker)) {
api::BucketInfo after = _env.getBucketInfo(cmd.getBucket());
@@ -229,8 +229,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock());
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- LOG(warning, "Bucket %s was merging at create time. Unexpected.",
- cmd.getBucketId().toString().c_str());
+ LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
}
spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition));
@@ -250,22 +249,18 @@ bool bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::Bucke
// where Proton starts reporting actual document sizes, and will eventually
// converge to a stable value. But for now, ignore it to prevent false positive
// error logs and non-deleted buckets.
- return ((a.getChecksum() == b.getChecksum())
- && (a.getDocumentCount() == b.getDocumentCount()));
+ return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
}
}
bool
-PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
- const api::BucketInfo& info) const
+PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
{
spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
if (result.hasError()) {
- LOG(error,
- "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
- bucket.toString().c_str(),
- result.getErrorMessage().c_str());
+ LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
+ bucket.toString().c_str(), result.getErrorMessage().c_str());
return false;
}
api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo()));
@@ -280,9 +275,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket,
"bucket has %s. Deletion has been rejected to ensure data is not "
"lost, but bucket may remain out of sync until service has been "
"restarted.",
- bucket.toString().c_str(),
- info.toString().c_str(),
- providerInfo.toString().c_str());
+ bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
return false;
}
return true;
@@ -387,9 +380,7 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd)
MessageTracker::UP
PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.splitBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock());
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
// Calculate the various bucket ids involved.
@@ -400,17 +391,15 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
}
if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) {
tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
- "Max lit bits must be set higher "
- "than the number of bits used in the bucket to split");
+ "Max lit bits must be set higher than the number of bits used in the bucket to split");
return tracker;
}
spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition));
SplitBitDetector::Result targetInfo;
if (_env._config.enableMultibitSplitOptimalization) {
- targetInfo = SplitBitDetector::detectSplit(
- _spi, spiBucket, cmd.getMaxSplitBits(),
- _context, cmd.getMinDocCount(), cmd.getMinByteSize());
+ targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(),
+ _context, cmd.getMinDocCount(), cmd.getMinByteSize());
}
if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
document::BucketId src(cmd.getBucketId());
@@ -449,13 +438,10 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
}
}
#endif
- spi::Result result = _spi.split(
- spiBucket,
- spi::Bucket(target1, spi::PartitionId(lock1.disk)),
- spi::Bucket(target2, spi::PartitionId(lock2.disk)), _context);
+ spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)),
+ spi::Bucket(target2, spi::PartitionId(lock2.disk)), _context);
if (result.hasError()) {
- tracker->fail(_env.convertErrorCode(result),
- result.getErrorMessage());
+ tracker->fail(_env.convertErrorCode(result), result.getErrorMessage());
return tracker;
}
// After split we need to take all bucket db locks to update them.
@@ -466,8 +452,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
api::SplitBucketReply & splitReply = *reply;
tracker->setReply(std::move(reply));
- typedef std::pair<StorBucketDatabase::WrappedEntry,
- FileStorHandler::RemapInfo> TargetInfo;
+ typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo;
std::vector<TargetInfo> targets;
for (uint32_t i = 0; i < 2; i++) {
const document::Bucket &target(i == 0 ? target1 : target2);
@@ -478,8 +463,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
target.getBucketId(), "PersistenceThread::handleSplitBucket - Target",
StorBucketDatabase::CREATE_IF_NONEXISTING),
FileStorHandler::RemapInfo(target, disk)));
- targets.back().first->setBucketInfo(
- _env.getBucketInfo(target, disk));
+ targets.back().first->setBucketInfo(_env.getBucketInfo(target, disk));
targets.back().first->disk = disk;
}
if (LOG_WOULD_LOG(spam)) {
@@ -494,41 +478,30 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
targ2.getMetaCount());
}
FileStorHandler::RemapInfo source(cmd.getBucket(), _env._partition);
- _env._fileStorHandler.remapQueueAfterSplit(
- source, targets[0].second, targets[1].second);
- bool ownershipChanged(
- !_bucketOwnershipNotifier->distributorOwns(
- cmd.getSourceIndex(), cmd.getBucket()));
+ _env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second);
+ bool ownershipChanged(!_bucketOwnershipNotifier->distributorOwns(cmd.getSourceIndex(), cmd.getBucket()));
// Now release all the bucketdb locks.
for (uint32_t i = 0; i < targets.size(); i++) {
if (ownershipChanged) {
- notifyGuard.notifyAlways(targets[i].second.bucket,
- targets[i].first->getBucketInfo());
+ notifyGuard.notifyAlways(targets[i].second.bucket, targets[i].first->getBucketInfo());
}
// The entries vector has the source bucket in element zero, so indexing
// that with i+1
- if (targets[i].second.foundInQueue
- || targets[i].first->getMetaCount() > 0)
- {
+ if (targets[i].second.foundInQueue || targets[i].first->getMetaCount() > 0) {
if (targets[i].first->getMetaCount() == 0) {
// Fake that the bucket has content so it is not deleted.
targets[i].first->info.setMetaCount(1);
// Must make sure target bucket exists when we have pending ops
// to an empty target bucket, since the provider will have
// implicitly erased it by this point.
- spi::Bucket createTarget(
- spi::Bucket(targets[i].second.bucket,
- spi::PartitionId(targets[i].second.diskIndex)));
- LOG(debug,
- "Split target %s was empty, but re-creating it since "
- "there are remapped operations queued to it",
+ spi::Bucket createTarget(spi::Bucket(targets[i].second.bucket,
+ spi::PartitionId(targets[i].second.diskIndex)));
+ LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it",
createTarget.toString().c_str());
_spi.createBucket(createTarget, _context);
}
- splitReply.getSplitInfo().push_back(
- api::SplitBucketReply::Entry(
- targets[i].second.bucket.getBucketId(),
- targets[i].first->getBucketInfo()));
+ splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(),
+ targets[i].first->getBucketInfo());
targets[i].first.write();
} else {
targets[i].first.remove();
@@ -536,8 +509,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
}
if (sourceEntry.exist()) {
if (ownershipChanged) {
- notifyGuard.notifyAlways(cmd.getBucket(),
- sourceEntry->getBucketInfo());
+ notifyGuard.notifyAlways(cmd.getBucket(), sourceEntry->getBucketInfo());
}
// Delete the old entry.
sourceEntry.remove();
@@ -546,30 +518,24 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
}
bool
-PersistenceThread::validateJoinCommand(
- const api::JoinBucketsCommand& cmd,
- MessageTracker& tracker) const
+PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const
{
if (cmd.getSourceBuckets().size() != 2) {
tracker.fail(ReturnCode::ILLEGAL_PARAMETERS,
- "Join needs exactly two buckets to be joined together"
- + cmd.getBucketId().toString());
+ "Join needs exactly two buckets to be joined together" + cmd.getBucketId().toString());
return false;
}
// Verify that source and target buckets look sane.
for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) {
if (cmd.getSourceBuckets()[i] == cmd.getBucketId()) {
tracker.fail(ReturnCode::ILLEGAL_PARAMETERS,
- "Join had both source and target bucket "
- + cmd.getBucketId().toString());
+ "Join had both source and target bucket " + cmd.getBucketId().toString());
return false;
}
if (!cmd.getBucketId().contains(cmd.getSourceBuckets()[i])) {
tracker.fail(ReturnCode::ILLEGAL_PARAMETERS,
- "Source bucket " +
- cmd.getSourceBuckets()[i].toString()
- + " is not contained in target "
- + cmd.getBucketId().toString());
+ "Source bucket " + cmd.getSourceBuckets()[i].toString()
+ + " is not contained in target " + cmd.getBucketId().toString());
return false;
}
}
@@ -579,9 +545,7 @@ PersistenceThread::validateJoinCommand(
MessageTracker::UP
PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.joinBuckets,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock());
if (!validateJoinCommand(cmd, *tracker)) {
return tracker;
}
@@ -594,11 +558,8 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
{
// Create empty bucket for target.
StorBucketDatabase::WrappedEntry entry =
- _env.getBucketDatabase(destBucket.getBucketSpace()).get(
- destBucket.getBucketId(),
- "join",
- StorBucketDatabase::CREATE_IF_NONEXISTING);
-
+ _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join",
+ StorBucketDatabase::CREATE_IF_NONEXISTING);
entry->disk = _env._partition;
entry.write();
}
@@ -644,13 +605,10 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]);
uint16_t disk = (i == 0) ? lock1.disk : lock2.disk;
FileStorHandler::RemapInfo target(cmd.getBucket(), _env._partition);
- _env._fileStorHandler.remapQueueAfterJoin(
- FileStorHandler::RemapInfo(srcBucket, disk),
- target);
+ _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket, disk), target);
// Remove source from bucket db.
- StorBucketDatabase::WrappedEntry entry(
- _env.getBucketDatabase(srcBucket.getBucketSpace()).get(
- srcBucket.getBucketId(), "join-remove-source"));
+ StorBucketDatabase::WrappedEntry entry =
+ _env.getBucketDatabase(srcBucket.getBucketSpace()).get(srcBucket.getBucketId(), "join-remove-source");
if (entry.exist()) {
lastModified = std::max(lastModified, entry->info.getLastModified());
entry.remove();
@@ -658,13 +616,10 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
}
{
StorBucketDatabase::WrappedEntry entry =
- _env.getBucketDatabase(destBucket.getBucketSpace()).get(
- destBucket.getBucketId(),
- "join",
- StorBucketDatabase::CREATE_IF_NONEXISTING);
+ _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join",
+ StorBucketDatabase::CREATE_IF_NONEXISTING);
if (entry->info.getLastModified() == 0) {
- entry->info.setLastModified(
- std::max(lastModified, entry->info.getLastModified()));
+ entry->info.setLastModified(std::max(lastModified, entry->info.getLastModified()));
}
entry.write();
}
@@ -674,28 +629,21 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
MessageTracker::UP
PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.setBucketStates,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.setBucketStates,_env._component.getClock());
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
- spi::BucketInfo::ActiveState newState(
- shouldBeActive
- ? spi::BucketInfo::ACTIVE
- : spi::BucketInfo::NOT_ACTIVE);
+ spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
spi::Result result(_spi.setActiveState(bucket, newState));
if (checkForError(result, *tracker)) {
- StorBucketDatabase::WrappedEntry entry(_env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(
- cmd.getBucketId(), "handleSetBucketState"));
+ StorBucketDatabase::WrappedEntry
+ entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState");
if (entry.exist()) {
entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
- notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(),
- cmd.getSourceIndex(),
- entry->info);
+ notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
entry.write();
} else {
LOG(warning, "Got OK setCurrentState result from provider for %s, "
@@ -703,8 +651,7 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
cmd.getBucketId().toString().c_str());
}
- tracker->setReply(api::StorageReply::SP(
- new api::SetBucketStateReply(cmd)));
+ tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
}
return tracker;
@@ -713,17 +660,13 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
MessageTracker::UP
PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.internalJoin,
- _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock());
document::Bucket destBucket = cmd.getBucket();
{
// Create empty bucket for target.
StorBucketDatabase::WrappedEntry entry =
_env.getBucketDatabase(destBucket.getBucketSpace()).get(
- destBucket.getBucketId(),
- "join",
- StorBucketDatabase::CREATE_IF_NONEXISTING);
+ destBucket.getBucketId(), "join", StorBucketDatabase::CREATE_IF_NONEXISTING);
entry->disk = _env._partition;
entry.write();
@@ -734,10 +677,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd)
spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())),
_context);
if (checkForError(result, *tracker)) {
- tracker->setReply(
- api::StorageReply::SP(
- new InternalBucketJoinReply(cmd,
- _env.getBucketInfo(cmd.getBucket()))));
+ tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket())));
}
return tracker;
}
@@ -745,17 +685,14 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd)
MessageTracker::UP
PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.recheckBucketInfo, _env._component.getClock()));
+ auto tracker = std::make_unique<MessageTracker>(_env._metrics.recheckBucketInfo, _env._component.getClock());
document::Bucket bucket(cmd.getBucket());
api::BucketInfo info(_env.getBucketInfo(bucket));
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
{
// Update bucket database
StorBucketDatabase::WrappedEntry entry(
- _component->getBucketDatabase(bucket.getBucketSpace()).get(
- bucket.getBucketId(),
- "handleRecheckBucketInfo"));
+ _component->getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "handleRecheckBucketInfo"));
if (entry.exist()) {
api::BucketInfo prevInfo(entry->getBucketInfo());
@@ -799,30 +736,23 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg)
return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg));
// Depends on iterators
case api::MessageType::STATBUCKET_ID:
- return _processAllHandler.handleStatBucket(
- static_cast<api::StatBucketCommand&>(msg), _context);
+ return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), _context);
case api::MessageType::REMOVELOCATION_ID:
- return _processAllHandler.handleRemoveLocation(
- static_cast<api::RemoveLocationCommand&>(msg), _context);
+ return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), _context);
case api::MessageType::MERGEBUCKET_ID:
- return _mergeHandler.handleMergeBucket(
- static_cast<api::MergeBucketCommand&>(msg), _context);
+ return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), _context);
case api::MessageType::GETBUCKETDIFF_ID:
- return _mergeHandler.handleGetBucketDiff(
- static_cast<api::GetBucketDiffCommand&>(msg), _context);
+ return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), _context);
case api::MessageType::APPLYBUCKETDIFF_ID:
- return _mergeHandler.handleApplyBucketDiff(
- static_cast<api::ApplyBucketDiffCommand&>(msg), _context);
+ return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), _context);
case api::MessageType::SETBUCKETSTATE_ID:
- return handleSetBucketState(
- static_cast<api::SetBucketStateCommand&>(msg));
+ return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
return handleGetIter(static_cast<GetIterCommand&>(msg));
case CreateIteratorCommand::ID:
- return handleCreateIterator(
- static_cast<CreateIteratorCommand&>(msg));
+ return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg));
case ReadBucketList::ID:
return handleReadBucketList(static_cast<ReadBucketList&>(msg));
case ReadBucketInfo::ID:
@@ -830,18 +760,13 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg)
case RepairBucketCommand::ID:
return handleRepairBucket(static_cast<RepairBucketCommand&>(msg));
case BucketDiskMoveCommand::ID:
- return _diskMoveHandler.handleBucketDiskMove(
- static_cast<BucketDiskMoveCommand&>(msg), _context);
+ return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), _context);
case InternalBucketJoinCommand::ID:
- return handleInternalBucketJoin(
- static_cast<InternalBucketJoinCommand&>(msg));
+ return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg));
case RecheckBucketInfoCommand::ID:
- return handleRecheckBucketInfo(
- static_cast<RecheckBucketInfoCommand&>(msg));
+ return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg));
default:
- LOG(warning,
- "Persistence thread received unhandled internal command %s",
- msg.toString().c_str());
+ LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str());
break;
}
default:
@@ -853,13 +778,11 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg)
MessageTracker::UP
PersistenceThread::handleCommand(api::StorageCommand& msg)
{
- _context = spi::Context(msg.getLoadType(), msg.getPriority(),
- msg.getTrace().getLevel());
+ _context = spi::Context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel());
MessageTracker::UP mtracker(handleCommandSplitByType(msg));
if (mtracker.get() != 0) {
if (mtracker->getReply().get() != 0) {
- mtracker->getReply()->getTrace().getRoot().addChild(
- _context.getTrace().getRoot());
+ mtracker->getReply()->getTrace().getRoot().addChild(_context.getTrace().getRoot());
} else {
msg.getTrace().getRoot().addChild(_context.getTrace().getRoot());
}
@@ -872,14 +795,10 @@ PersistenceThread::handleReply(api::StorageReply& reply)
{
switch (reply.getType().getId()) {
case api::MessageType::GETBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleGetBucketDiffReply(
- static_cast<api::GetBucketDiffReply&>(reply),
- _env._fileStorHandler);
+ _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler);
break;
case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleApplyBucketDiffReply(
- static_cast<api::ApplyBucketDiffReply&>(reply),
- _env._fileStorHandler);
+ _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler);
break;
default:
break;
@@ -889,8 +808,7 @@ PersistenceThread::handleReply(api::StorageReply& reply)
MessageTracker::UP
PersistenceThread::processMessage(api::StorageMessage& msg)
{
- MBUS_TRACE(msg.getTrace(), 5,
- "PersistenceThread: Processing message in persistence layer");
+ MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer");
++_env._metrics.operations;
if (msg.getType().isReply()) {
@@ -903,8 +821,7 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
}
} else {
- api::StorageCommand& initiatingCommand =
- static_cast<api::StorageCommand&>(msg);
+ api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg);
try {
int64_t startTime(_component->getClock().getTimeInMillis().getTime());
@@ -913,8 +830,7 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
LOG(spam, "Message content: %s", msg.toString(true).c_str());
auto tracker(handleCommand(initiatingCommand));
if (!tracker.get()) {
- LOG(debug, "Received unsupported command %s",
- msg.getType().getName().c_str());
+ LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str());
} else {
tracker->generateReply(initiatingCommand);
if ((tracker->getReply().get()
@@ -925,29 +841,21 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
}
}
- int64_t stopTime(
- _component->getClock().getTimeInMillis().getTime());
+ int64_t stopTime(_component->getClock().getTimeInMillis().getTime());
if (stopTime - startTime >= _warnOnSlowOperations) {
LOGBT(warning, msg.getType().toString(),
- "Slow processing of message %s on disk %u. "
- "Processing time: %" PRId64 " ms (>=%d ms)",
- msg.toString().c_str(), _env._partition,
- stopTime - startTime, _warnOnSlowOperations);
+ "Slow processing of message %s on disk %u. Processing time: %" PRId64 " ms (>=%d ms)",
+ msg.toString().c_str(), _env._partition, stopTime - startTime, _warnOnSlowOperations);
} else {
- LOGBT(spam, msg.getType().toString(),
- "Processing time of message %s on disk %u: %" PRId64 " ms",
- msg.toString(true).c_str(), _env._partition,
- stopTime - startTime);
+ LOGBT(spam, msg.getType().toString(), "Processing time of message %s on disk %u: %" PRId64 " ms",
+ msg.toString(true).c_str(), _env._partition, stopTime - startTime);
}
return tracker;
} catch (std::exception& e) {
- LOG(debug, "Caught exception for %s: %s",
- msg.toString().c_str(),
- e.what());
+ LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
api::StorageReply::SP reply(initiatingCommand.makeReply().release());
- reply->setResult(api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, e.what()));
+ reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
_env._fileStorHandler.sendReply(reply);
}
}
@@ -1016,8 +924,7 @@ PersistenceThread::flushAllReplies(
uint32_t errorCode = _env.convertErrorCode(result);
if (errorCode != 0) {
for (uint32_t i = 0; i < replies.size(); ++i) {
- replies[i]->getReply()->setResult(
- api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage()));
+ replies[i]->getReply()->setResult(api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage()));
}
}
} catch (std::exception& e) {
@@ -1041,8 +948,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
document::Bucket bucket = lock.first->getBucket();
while (lock.second) {
- LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p",
- _env._partition, _env._nodeIndex, lock.second.get());
+ LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
std::shared_ptr<api::StorageMessage> msg(lock.second);
bool batchable = isBatchable(*msg);
@@ -1077,7 +983,6 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
} else {
LOG(spam, "Sending reply up: %s %zu",
tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId());
-
_env._fileStorHandler.sendReply(tracker->getReply());
break;
}