summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-28 08:24:57 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-29 14:14:03 +0000
commiteb7b71781ca079b5577a13b300beafee388bc1ce (patch)
tree8a5194ed759a8fc8433fef14118e67ac6bfc2632 /storage
parent9499865f8a43aa097841606795a2bea8d0273ef9 (diff)
- Add async interface to put
- Use MessageTracker for keeping context. - implement putAsync, but still use it synchronously.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp8
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h2
-rw-r--r--storage/src/tests/persistence/diskmoveoperationhandlertest.cpp10
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp7
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp103
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp2
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h17
-rw-r--r--storage/src/tests/persistence/persistencethread_splittest.cpp20
-rw-r--r--storage/src/tests/persistence/processalltest.cpp45
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp69
-rw-r--r--storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp24
-rw-r--r--storage/src/vespa/storage/persistence/diskmoveoperationhandler.h3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp82
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h9
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp237
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h47
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp63
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h39
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp37
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.h11
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h2
22 files changed, 403 insertions, 441 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index fb80c25bfb7..862d1fb758a 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -91,14 +91,12 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const
}
spi::Result
-PersistenceProviderWrapper::put(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const document::Document::SP& doc,
- spi::Context& context)
+PersistenceProviderWrapper::put(const spi::Bucket& bucket, spi::Timestamp timestamp,
+ document::Document::SP doc, spi::Context& context)
{
LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")");
CHECK_ERROR(spi::Result, FAIL_PUT);
- return _spi.put(bucket, timestamp, doc, context);
+ return _spi.put(bucket, timestamp, std::move(doc), context);
}
spi::RemoveResult
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 9bd3653e8a1..25365e64bfc 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -88,7 +88,7 @@ public:
spi::PartitionStateListResult getPartitionStates() const override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace, spi::PartitionId) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override;
+ spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
diff --git a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp
index 0dd3285e5f3..c10681405e7 100644
--- a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp
+++ b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp
@@ -27,13 +27,11 @@ TEST_F(DiskMoveOperationHandlerTest, simple) {
doPutOnDisk(3, 4, spi::Timestamp(1000 + i));
}
- DiskMoveOperationHandler diskMoveHandler(
- getEnv(3),
- getPersistenceProvider());
- BucketDiskMoveCommand move(makeDocumentBucket(document::BucketId(16, 4)), 3, 4);
-
+ DiskMoveOperationHandler diskMoveHandler(getEnv(3),getPersistenceProvider());
+ document::Bucket bucket = makeDocumentBucket(document::BucketId(16, 4));
+ auto move = std::make_shared<BucketDiskMoveCommand>(bucket, 3, 4);
spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- diskMoveHandler.handleBucketDiskMove(move, context);
+ diskMoveHandler.handleBucketDiskMove(*move, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), move));
EXPECT_EQ("BucketId(0x4000000000000004): 10,4",
getBucketStatus(document::BucketId(16,4)));
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index ba344971c3b..e9f878bfe1e 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -46,13 +46,8 @@ public:
_deleteBucketInvocations(0)
{}
- spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp,
- const document::Document::SP& doc, spi::Context& context) override
+ spi::Result put(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&) override
{
- (void) bucket;
- (void) timestamp;
- (void) doc;
- (void) context;
_queueBarrier.await();
// message abort stage with active opertion in disk queue
std::this_thread::sleep_for(75ms);
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index dffd4ef1768..035da326d48 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -149,6 +149,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd;
};
+ MessageTracker::UP
+ createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
+ return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd));
+ }
+
std::string
doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -203,9 +208,9 @@ TEST_F(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- cmd.setSourceIndex(1234);
- MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ cmd->setSourceIndex(1234);
+ MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
LOG(debug, "Check state");
ASSERT_EQ(1, messageKeeper()._msgs.size());
@@ -217,7 +222,7 @@ TEST_F(MergeHandlerTest, merge_bucket_command) {
EXPECT_EQ(1, cmd2.getAddress()->getIndex());
EXPECT_EQ(1234, cmd2.getSourceIndex());
- tracker->generateReply(cmd);
+ tracker->generateReply(*cmd);
EXPECT_FALSE(tracker->hasReply());
}
@@ -228,8 +233,8 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Verifying that get bucket diff is sent on");
- api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
- MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context);
+ auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
+ MessageTracker::UP tracker1 = handler.handleGetBucketDiff(*cmd, createTracker(cmd, _bucket));
api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP();
if (midChain) {
@@ -277,8 +282,8 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Verifying that apply bucket diff is sent on");
- api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
- MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context);
+ auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
+ MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP();
if (midChain) {
@@ -324,9 +329,9 @@ TEST_F(MergeHandlerTest, master_message_flow) {
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
LOG(debug, "Check state");
ASSERT_EQ(1, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType());
@@ -425,8 +430,8 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
LOG(debug, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd);
@@ -505,9 +510,8 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize);
applyBucketDiffCmd->getDiff() = applyDiff;
- MergeHandler handler(
- getPersistenceProvider(), getEnv(), maxChunkSize);
- handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
+ MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
+ handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
// Should not fill up more than chunk size allows for
@@ -520,8 +524,8 @@ TEST_F(MergeHandlerTest, max_timestamp) {
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
@@ -534,8 +538,7 @@ MergeHandlerTest::fillDummyApplyDiff(
std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
{
document::TestDocMan docMan;
- document::Document::SP doc(
- docMan.createRandomDocumentAtLocation(_location));
+ document::Document::SP doc(docMan.createRandomDocumentAtLocation(_location));
std::vector<char> headerBlob;
{
vespalib::nbostream stream;
@@ -638,7 +641,8 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
providerWrapper.clearOperationLog();
try {
- handler.handleApplyBucketDiff(*createDummyApplyDiff(6000), *_context);
+ auto cmd = createDummyApplyDiff(6000);
+ handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
@@ -648,15 +652,15 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler(getPersistenceProvider(), getEnv());
// Send merge for unknown bucket
- api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
- MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
+ MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd);
@@ -682,8 +686,8 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
_nodes.emplace_back(0, false);
_nodes.emplace_back(1, false);
_nodes.emplace_back(2, false);
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd);
@@ -722,7 +726,7 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024);
applyBucketDiffCmd->getDiff() = applyDiff;
- auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
+ auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
ASSERT_TRUE(applyBucketDiffReply.get());
@@ -809,10 +813,10 @@ void
MergeHandlerTest::HandleMergeBucketInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
- api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleMergeBucket(cmd, context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
@@ -841,17 +845,16 @@ void
MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context& )
{
- api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleGetBucketDiff(cmd, context);
+ auto cmd = std::make_shared<api::GetBucketDiffCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler(providerWrapper, getEnv());
- providerWrapper.setResult(
- spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
+ providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
@@ -874,12 +877,11 @@ void
MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
++_counter;
- std::shared_ptr<api::ApplyBucketDiffCommand> cmd(
- test.createDummyApplyDiff(100000 * _counter));
- handler.handleApplyBucketDiff(*cmd, context);
+ auto cmd = test.createDummyApplyDiff(100000 * _counter);
+ handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
@@ -904,8 +906,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it));
// Casual, in-place testing of bug 6752085.
// This will fail if we give NaN to the metric in question.
- EXPECT_TRUE(std::isfinite(getEnv()._metrics
- .mergeAverageDataReceivedNeeded.getLast()));
+ EXPECT_TRUE(std::isfinite(getEnv()._metrics.mergeAverageDataReceivedNeeded.getLast()));
}
}
@@ -913,10 +914,10 @@ void
MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
- api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleMergeBucket(cmd, context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
_diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>();
}
@@ -974,13 +975,13 @@ void
MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
++_counter;
_stub.clear();
if (getChainPos() == FRONT) {
- api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleMergeBucket(cmd, context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
auto diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>();
auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4);
diffCmd->getDiff() = dummyDiff->getDiff();
@@ -995,7 +996,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
// Pretend last node in chain has data and that it will be fetched when
// chain is unwinded.
auto cmd = test.createDummyApplyDiff(100000 * _counter, 0x4, false);
- handler.handleApplyBucketDiff(*cmd, context);
+ handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
_applyCmd = test.fetchSingleMessage<api::ApplyBucketDiffCommand>();
}
}
@@ -1147,13 +1148,13 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024);
applyBucketDiffCmd->getDiff() = applyDiff;
- auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
+ auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
ASSERT_TRUE(applyBucketDiffReply.get());
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 25c0a36a7f5..4ac9dfd7765 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -283,7 +283,7 @@ PersistenceTestUtils::doPut(const document::Document::SP& doc,
spi::Context context(defaultLoadType, spi::Priority(0),
spi::Trace::TraceLevel(0));
getPersistenceProvider().createBucket(b, context);
- getPersistenceProvider().put(b, time, doc, context);
+ getPersistenceProvider().put(b, time, std::move(doc), context);
}
spi::UpdateResult
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index e418765ecac..3121bef61e5 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -37,6 +37,23 @@ struct PersistenceTestEnvironment {
class PersistenceTestUtils : public testing::Test {
public:
+ class NoBucketLock : public FileStorHandler::BucketLockInterface
+ {
+ public:
+ NoBucketLock(document::Bucket bucket) : _bucket(bucket) { }
+ const document::Bucket &getBucket() const override {
+ return _bucket;
+ }
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Shared;
+ }
+ static std::shared_ptr<NoBucketLock> make(document::Bucket bucket) {
+ return std::make_shared<NoBucketLock>(bucket);
+ }
+ private:
+ document::Bucket _bucket;
+ };
+
std::unique_ptr<PersistenceTestEnvironment> _env;
PersistenceTestUtils();
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp
index 98a2be6880d..1ec6a35fb1d 100644
--- a/storage/src/tests/persistence/persistencethread_splittest.cpp
+++ b/storage/src/tests/persistence/persistencethread_splittest.cpp
@@ -201,19 +201,20 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
}
document::Document::SP doc(testDocMan.createRandomDocumentAtLocation(
docloc, seed, docSize, docSize));
- spi.put(bucket, spi::Timestamp(1000 + i), doc, context);
+ spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context);
}
std::unique_ptr<PersistenceThread> thread(createPersistenceThread(0));
getNode().getStateUpdater().setClusterState(
std::make_shared<lib::ClusterState>("distributor:1 storage:1"));
- api::SplitBucketCommand cmd(makeDocumentBucket(document::BucketId(currentSplitLevel, 1)));
- cmd.setMaxSplitBits(maxBits);
- cmd.setMinSplitBits(minBits);
- cmd.setMinByteSize(maxSize);
- cmd.setMinDocCount(maxCount);
- cmd.setSourceIndex(0);
- MessageTracker::UP result(thread->handleSplitBucket(cmd, context));
+ document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1));
+ auto cmd = std::make_shared<api::SplitBucketCommand>(docBucket);
+ cmd->setMaxSplitBits(maxBits);
+ cmd->setMinSplitBits(minBits);
+ cmd->setMinByteSize(maxSize);
+ cmd->setMinDocCount(maxCount);
+ cmd->setSourceIndex(0);
+ MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd));
api::ReturnCode code(result->getResult());
EXPECT_EQ(error, code);
if (!code.success()) {
@@ -222,8 +223,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
auto& reply = dynamic_cast<api::SplitBucketReply&>(result->getReply());
std::set<std::string> expected;
for (uint32_t i=0; i<resultBuckets; ++i) {
- document::BucketId b(resultSplitLevel,
- location | (i == 0 ? 0 : splitMask));
+ document::BucketId b(resultSplitLevel, location | (i == 0 ? 0 : splitMask));
std::ostringstream ost;
ost << b << " - " << b.getUsedBits();
expected.insert(ost.str());
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index 5462b4a5b0a..0d482ebe5b7 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -20,10 +20,10 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
doPut(4, spi::Timestamp(1234));
doPut(4, spi::Timestamp(2345));
- api::RemoveLocationCommand removeLocation("id.user == 4", makeDocumentBucket(bucketId));
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- auto tracker = handler.handleRemoveLocation(removeLocation, context);
+ auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n",
@@ -45,10 +45,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
}
- api::RemoveLocationCommand
- removeLocation("testdoctype1.headerval % 2 == 0", makeDocumentBucket(bucketId));
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- auto tracker = handler.handleRemoveLocation(removeLocation, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket);
+ auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
@@ -71,12 +70,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
- api::RemoveLocationCommand
- removeLocation("unknowndoctype.headerval % 2 == 0", makeDocumentBucket(bucketId));
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception);
+ ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
dumpBucket(bucketId));
@@ -86,11 +84,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
- api::RemoveLocationCommand removeLocation("id.bogus != badgers", makeDocumentBucket(bucketId));
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception);
+ ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
dumpBucket(bucketId));
@@ -107,10 +105,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc
doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
}
- api::StatBucketCommand statBucket(makeDocumentBucket(bucketId),
- "testdoctype1.headerval % 2 == 0");
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0");
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
@@ -142,9 +139,9 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries)
true);
}
- api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true");
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true");
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
@@ -188,9 +185,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_
doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
}
- api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true");
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true");
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index 08555fe0627..864ab320527 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -29,6 +29,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
const document::StringFieldValue MATCHING_HEADER{"Some string with woofy dog as a substring"};
const document::StringFieldValue OLD_CONTENT{"Some old content"};
const document::StringFieldValue NEW_CONTENT{"Freshly pressed and squeezed content"};
+ const document::Bucket BUCKET = makeDocumentBucket(BUCKET_ID);
unique_ptr<PersistenceThread> thread;
shared_ptr<document::Document> testDoc;
@@ -39,6 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
: context(spi::LoadType(0, "default"), 0, 0)
{}
+ MessageTracker::UP
+ createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
+ return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd));
+ }
+
void SetUp() override {
SingleDiskPersistenceTestUtils::SetUp();
@@ -57,7 +63,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
SingleDiskPersistenceTestUtils::TearDown();
}
- std::unique_ptr<api::UpdateCommand> conditional_update_test(
+ std::shared_ptr<api::UpdateCommand> conditional_update_test(
bool createIfMissing,
api::Timestamp updateTimestamp);
@@ -82,10 +88,10 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) {
// Conditionally replace document, but fail due to lack of woofy dog
api::Timestamp timestampTwo = 1;
- api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo);
- setTestCondition(putTwo);
+ auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
+ setTestCondition(*putTwo);
- ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
}
@@ -102,10 +108,10 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) {
// Conditionally replace document with updated version, succeed in doing so
api::Timestamp timestampTwo = 1;
- api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo);
- setTestCondition(putTwo);
+ auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
+ setTestCondition(*putTwo);
- ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -122,10 +128,10 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) {
// Conditionally remove document, fail in doing so
api::Timestamp timestampTwo = 1;
- api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo);
- setTestCondition(remove);
+ auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
+ setTestCondition(*remove);
- ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -142,18 +148,17 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) {
// Conditionally remove document, succeed in doing so
api::Timestamp timestampTwo = 1;
- api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo);
- setTestCondition(remove);
+ auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
+ setTestCondition(*remove);
- ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY),
dumpBucket(BUCKET_ID));
}
-std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test(
- bool createIfMissing,
- api::Timestamp updateTimestamp)
+std::shared_ptr<api::UpdateCommand>
+TestAndSetTest::conditional_update_test(bool createIfMissing, api::Timestamp updateTimestamp)
{
auto docUpdate = std::make_shared<document::DocumentUpdate>(_env->_testDocMan.getTypeRepo(), testDoc->getType(), testDocId);
auto fieldUpdate = document::FieldUpdate(testDoc->getField("content"));
@@ -161,7 +166,7 @@ std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test(
docUpdate->addUpdate(fieldUpdate);
docUpdate->setCreateIfNonExistent(createIfMissing);
- auto updateUp = std::make_unique<api::UpdateCommand>(makeDocumentBucket(BUCKET_ID), docUpdate, updateTimestamp);
+ auto updateUp = std::make_unique<api::UpdateCommand>(BUCKET, docUpdate, updateTimestamp);
setTestCondition(*updateUp);
return updateUp;
}
@@ -172,7 +177,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -185,7 +190,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -197,7 +202,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -206,7 +211,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
@@ -215,10 +220,10 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) {
// Conditionally replace nonexisting document
// Fail early since document selection is invalid
api::Timestamp timestamp = 0;
- api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
- put.setCondition(documentapi::TestAndSetCondition("bjarne"));
+ auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
+ put->setCondition(documentapi::TestAndSetCondition("bjarne"));
- ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
+ ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -226,11 +231,11 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) {
// Conditionally replace nonexisting document
// Fail since no document exists to match with test and set
api::Timestamp timestamp = 0;
- api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
- setTestCondition(put);
- thread->handlePut(put, context);
+ auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
+ setTestCondition(*put);
+ thread->handlePut(*put, createTracker(put, BUCKET));
- ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -253,8 +258,8 @@ TestAndSetTest::createTestDocument()
document::Document::SP TestAndSetTest::retrieveTestDocument()
{
- api::GetCommand get(makeDocumentBucket(BUCKET_ID), testDocId, "[all]");
- auto tracker = thread->handleGet(get, context);
+ auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, "[all]");
+ auto tracker = thread->handleGet(*get, createTracker(get, BUCKET));
assert(tracker->getResult() == api::ReturnCode::Result::OK);
auto & reply = static_cast<api::GetReply &>(tracker->getReply());
@@ -273,8 +278,8 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta
testDoc->setValue(testDoc->getField("hstringval"), MATCHING_HEADER);
}
- api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
- thread->handlePut(put, context);
+ auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
+ thread->handlePut(*put, createTracker(put, BUCKET));
}
void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value)
diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
index e906cfc624f..18877bdf8f7 100644
--- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
+++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
@@ -7,20 +7,16 @@ LOG_SETUP(".persistence.diskmoveoperationhandler");
namespace storage {
-DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env,
- spi::PersistenceProvider& provider)
+DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env, spi::PersistenceProvider& provider)
: _env(env),
_provider(provider)
{
}
MessageTracker::UP
-DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd,
- spi::Context& context)
+DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, MessageTracker::UP tracker)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.movedBuckets,
- _env._component.getClock()));
+ tracker->setMetric(_env._metrics.movedBuckets);
document::Bucket bucket(cmd.getBucket());
uint32_t targetDisk(cmd.getDstDisk());
@@ -46,13 +42,10 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd,
deviceIndex, targetDisk);
spi::Bucket from(bucket, spi::PartitionId(deviceIndex));
- spi::Bucket to(bucket, spi::PartitionId(targetDisk));
- spi::Result result(
- _provider.move(from, spi::PartitionId(targetDisk), context));
+ spi::Result result(_provider.move(from, spi::PartitionId(targetDisk), tracker->context()));
if (result.hasError()) {
- tracker->fail(api::ReturnCode::INTERNAL_FAILURE,
- result.getErrorMessage());
+ tracker->fail(api::ReturnCode::INTERNAL_FAILURE, result.getErrorMessage());
return tracker;
}
@@ -82,12 +75,7 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd,
}
// Answer message, setting extra info such as filesize
- tracker->setReply(std::shared_ptr<BucketDiskMoveReply>(
- new BucketDiskMoveReply(
- cmd,
- bInfo,
- sourceFileSize,
- sourceFileSize)));
+ tracker->setReply(std::make_shared<BucketDiskMoveReply>(cmd, bInfo, sourceFileSize, sourceFileSize));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h
index f0c4bbef66a..9e8d33fc802 100644
--- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h
+++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h
@@ -12,8 +12,7 @@ public:
DiskMoveOperationHandler(PersistenceUtil&,
spi::PersistenceProvider& provider);
- MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&,
- spi::Context&);
+ MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&, MessageTracker::UP tracker);
private:
PersistenceUtil& _env;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 4cb687bb753..3483b15dd0e 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -537,9 +537,10 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket,
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
- checkResult(_spi.put(bucket, timestamp, doc, context),
+ DocumentId docId = doc->getId();
+ checkResult(_spi.put(bucket, timestamp, std::move(doc), context),
bucket,
- doc->getId(),
+ docId,
"put");
} else {
DocumentId docId(e._docName);
@@ -903,12 +904,9 @@ public:
};
MessageTracker::UP
-MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
- spi::Context& context)
+MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.mergeBuckets,
- _env._component.getClock()));
+ tracker->setMetric(_env._metrics.mergeBuckets);
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".",
@@ -949,44 +947,33 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
tracker->fail(ReturnCode::BUSY, err);
return tracker;
}
- checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
+ checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
- MergeStatus::SP s = MergeStatus::SP(new MergeStatus(
+ auto s = std::make_shared<MergeStatus>(
_env._component.getClock(), cmd.getLoadType(),
- cmd.getPriority(), cmd.getTrace().getLevel()));
+ cmd.getPriority(), cmd.getTrace().getLevel());
_env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->nodeList = cmd.getNodes();
s->maxTimestamp = Timestamp(cmd.getMaxTimestamp());
s->timeout = cmd.getTimeout();
s->startTime = framework::MilliSecTimer(_env._component.getClock());
- std::shared_ptr<api::GetBucketDiffCommand> cmd2(
- new api::GetBucketDiffCommand(bucket.getBucket(),
- s->nodeList,
- s->maxTimestamp.getTime()));
- if (!buildBucketInfoList(bucket,
- cmd.getLoadType(),
- s->maxTimestamp,
- 0,
- cmd2->getDiff(),
- context))
- {
+ auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), s->nodeList, s->maxTimestamp.getTime());
+ if (!buildBucketInfoList(bucket, cmd.getLoadType(), s->maxTimestamp, 0, cmd2->getDiff(), tracker->context())) {
LOG(debug, "Bucket non-existing in db. Failing merge.");
tracker->fail(ReturnCode::BUCKET_DELETED,
"Bucket not found in buildBucketInfo step");
return tracker;
}
- _env._metrics.mergeMetadataReadLatency.addValue(
- s->startTime.getElapsedTimeAsDouble());
+ _env._metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble());
LOG(spam, "Sending GetBucketDiff %" PRIu64 " for %s to next node %u "
"with diff of %u entries.",
cmd2->getMsgId(),
bucket.toString().c_str(),
s->nodeList[1].index,
uint32_t(cmd2->getDiff().size()));
- cmd2->setAddress(createAddress(_env._component.getClusterName(),
- s->nodeList[1].index));
+ cmd2->setAddress(createAddress(_env._component.getClusterName(), s->nodeList[1].index));
cmd2->setPriority(s->context.getPriority());
cmd2->setTimeout(s->timeout);
cmd2->setSourceIndex(cmd.getSourceIndex());
@@ -1129,15 +1116,12 @@ namespace {
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
- spi::Context& context)
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.getBucketDiff,
- _env._component.getClock()));
+ tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
- checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
+ checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(ReturnCode::BUSY,
@@ -1151,7 +1135,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
framework::MilliSecTimer startTime(_env._component.getClock());
if (!buildBucketInfoList(bucket, cmd.getLoadType(),
Timestamp(cmd.getMaxTimestamp()),
- index, local, context))
+ index, local, tracker->context()))
{
LOG(debug, "Bucket non-existing in db. Failing merge.");
tracker->fail(ReturnCode::BUCKET_DELETED,
@@ -1186,31 +1170,26 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
cmd.getMsgId(), bucket.toString().c_str(),
cmd.getNodes()[index - 1].index, final.size(), local.size());
- api::GetBucketDiffReply* reply = new api::GetBucketDiffReply(cmd);
- tracker->setReply(api::StorageReply::SP(reply));
+ auto reply = std::make_shared<api::GetBucketDiffReply>(cmd);
reply->getDiff().swap(final);
+ tracker->setReply(std::move(reply));
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
- MergeStatus::SP s(new MergeStatus(_env._component.getClock(),
+ auto s = std::make_shared<MergeStatus>(_env._component.getClock(),
cmd.getLoadType(), cmd.getPriority(),
- cmd.getTrace().getLevel()));
+ cmd.getTrace().getLevel());
_env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
- s->pendingGetDiff =
- api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd));
+ s->pendingGetDiff = std::make_shared<api::GetBucketDiffReply>(cmd);
s->pendingGetDiff->setPriority(cmd.getPriority());
- LOG(spam, "Sending GetBucketDiff for %s on to node %d, "
- "added %zu new entries to diff.",
+ LOG(spam, "Sending GetBucketDiff for %s on to node %d, added %zu new entries to diff.",
bucket.toString().c_str(), cmd.getNodes()[index + 1].index,
local.size() - remote.size());
- std::shared_ptr<api::GetBucketDiffCommand> cmd2(
- new api::GetBucketDiffCommand(
- bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp()));
- cmd2->setAddress(createAddress(_env._component.getClusterName(),
- cmd.getNodes()[index + 1].index));
+ auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp());
+ cmd2->setAddress(createAddress(_env._component.getClusterName(), cmd.getNodes()[index + 1].index));
cmd2->getDiff().swap(local);
cmd2->setPriority(cmd.getPriority());
cmd2->setTimeout(cmd.getTimeout());
@@ -1330,10 +1309,9 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
}
MessageTracker::UP
-MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
- spi::Context& context)
+MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.applyBucketDiff, _env._component.getClock());
+ tracker->setMetric(_env._metrics.applyBucketDiff);
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
LOG(debug, "%s", cmd.toString().c_str());
@@ -1348,10 +1326,8 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
bool lastInChain = index + 1u >= cmd.getNodes().size();
if (applyDiffNeedLocalData(cmd.getDiff(), index, !lastInChain)) {
framework::MilliSecTimer startTime(_env._component.getClock());
- fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index,
- context);
- _env._metrics.mergeDataReadLatency.addValue(
- startTime.getElapsedTimeAsDouble());
+ fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context());
+ _env._metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
} else {
LOG(spam, "Merge(%s): Moving %zu entries, didn't need "
"local data on node %u (%u).",
@@ -1363,7 +1339,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_env._component.getClock());
api::BucketInfo info(applyDiffLocally(bucket, cmd.getLoadType(),
- cmd.getDiff(), index, context));
+ cmd.getDiff(), index, tracker->context()));
_env._metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
} else {
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 4ea69bd0fdf..7052258ec03 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -55,13 +55,10 @@ public:
uint8_t nodeIndex,
spi::Context& context);
- MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&,
- spi::Context&);
- MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&,
- spi::Context&);
+ MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, MessageTracker::UP);
+ MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTracker::UP);
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&);
- MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&,
- spi::Context&);
+ MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTracker::UP);
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&);
private:
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 422e19a492e..5905d93cc83 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -69,7 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucke
bool
PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker)
{
- uint32_t code = _env.convertErrorCode(response);
+ uint32_t code = PersistenceUtil::convertErrorCode(response);
if (code != 0) {
tracker.fail(code, response.getErrorMessage());
@@ -80,11 +80,13 @@ PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tr
}
-bool PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) {
+bool
+PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) {
return cmd.getCondition().isPresent();
}
-bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
+bool
+PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch) {
try {
TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch);
@@ -104,35 +106,35 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd,
}
MessageTracker::UP
-PersistenceThread::handlePut(api::PutCommand& cmd, spi::Context & context)
+PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.put[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) {
return tracker;
}
spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), context);
+ spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker->context());
checkForError(response, *tracker);
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context)
+PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.remove[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) {
return tracker;
}
spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), context);
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker->context());
if (checkForError(response, *tracker)) {
tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
@@ -143,18 +145,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context)
}
MessageTracker::UP
-PersistenceThread::handleUpdate(api::UpdateCommand& cmd, spi::Context & context)
+PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.update[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context, cmd.getUpdate()->getCreateIfNonExistent())) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) {
return tracker;
}
spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), context);
+ spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context());
if (checkForError(response, *tracker)) {
auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
@@ -176,17 +178,16 @@ spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency co
}
MessageTracker::UP
-PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context)
+PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.get[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
- context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
+ document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
+ tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
spi::GetResult result =
- _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), context);
+ _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), tracker->context());
if (checkForError(result, *tracker)) {
if (!result.hasDocument()) {
@@ -199,9 +200,9 @@ PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context)
}
MessageTracker::UP
-PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
+PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock());
+ tracker->setMetric(_env._metrics.repairs);
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(),
(cmd.verifyBody() ? "Verifying body" : "Not verifying body"));
@@ -225,28 +226,28 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleRevert(api::RevertCommand& cmd, spi::Context & context)
+PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock());
+ tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]);
spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
for (const api::Timestamp & token : tokens) {
- spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), context);
+ spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context());
}
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context)
+PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.createBuckets);
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
}
spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- _spi.createBucket(spiBucket, context);
+ _spi.createBucket(spiBucket, tracker->context());
if (cmd.getActive()) {
_spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
}
@@ -295,9 +296,9 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, con
}
MessageTracker::UP
-PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context)
+PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.deleteBuckets);
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
@@ -308,7 +309,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex
if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
return tracker;
}
- _spi.deleteBucket(bucket, context);
+ _spi.deleteBucket(bucket, tracker->context());
StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
{
StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
@@ -333,12 +334,12 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex
}
MessageTracker::UP
-PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context)
+PersistenceThread::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock());
- spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), context));
+ tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]);
+ spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context()));
if (checkForError(result, *tracker)) {
- GetIterReply::SP reply(new GetIterReply(cmd));
+ auto reply = std::make_shared<GetIterReply>(cmd);
reply->getEntries() = result.steal_entries();
_env._metrics.visit[cmd.getLoadType()].
documentsPerIterate.addValue(reply->getEntries().size());
@@ -351,9 +352,9 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context)
}
MessageTracker::UP
-PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
+PersistenceThread::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock());
+ tracker->setMetric(_env._metrics.readBucketList);
spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition()));
if (checkForError(result, *tracker)) {
@@ -366,23 +367,22 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
}
MessageTracker::UP
-PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd)
+PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock());
+ tracker->setMetric(_env._metrics.readBucketInfo);
_env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket()));
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context)
+PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock());
- document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields());
- context.setReadConsistency(cmd.getReadConsistency());
+ tracker->setMetric(_env._metrics.createIterator);
+ document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFields());
+ tracker->context().setReadConsistency(cmd.getReadConsistency());
spi::CreateIteratorResult result(_spi.createIterator(
spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
- *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), context));
+ *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), tracker->context()));
if (checkForError(result, *tracker)) {
tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
}
@@ -390,9 +390,9 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context
}
MessageTracker::UP
-PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context)
+PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.splitBuckets);
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
// Calculate the various bucket ids involved.
@@ -411,7 +411,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
SplitBitDetector::Result targetInfo;
if (_env._config.enableMultibitSplitOptimalization) {
targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(),
- context, cmd.getMinDocCount(), cmd.getMinByteSize());
+ tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize());
}
if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
document::BucketId src(cmd.getBucketId());
@@ -451,9 +451,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
}
#endif
spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)),
- spi::Bucket(target2, spi::PartitionId(lock2.disk)), context);
+ spi::Bucket(target2, spi::PartitionId(lock2.disk)), tracker->context());
if (result.hasError()) {
- tracker->fail(_env.convertErrorCode(result), result.getErrorMessage());
+ tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage());
return tracker;
}
// After split we need to take all bucket db locks to update them.
@@ -509,7 +509,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
spi::PartitionId(targets[i].second.diskIndex)));
LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it",
createTarget.toString().c_str());
- _spi.createBucket(createTarget, context);
+ _spi.createBucket(createTarget, tracker->context());
}
splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(),
targets[i].first->getBucketInfo());
@@ -529,7 +529,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
}
bool
-PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const
+PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker)
{
if (cmd.getSourceBuckets().size() != 2) {
tracker.fail(ReturnCode::ILLEGAL_PARAMETERS,
@@ -554,9 +554,9 @@ PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, Messa
}
MessageTracker::UP
-PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context)
+PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.joinBuckets);
if (!validateJoinCommand(cmd, *tracker)) {
return tracker;
}
@@ -603,7 +603,7 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context
_spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)),
spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)),
spi::Bucket(destBucket, spi::PartitionId(_env._partition)),
- context);
+ tracker->context());
if (!checkForError(result, *tracker)) {
return tracker;
}
@@ -634,9 +634,9 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context
}
MessageTracker::UP
-PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
+PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.setBucketStates,_env._component.getClock());
+ tracker->setMetric(_env._metrics.setBucketStates);
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
@@ -665,9 +665,9 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context)
+PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock());
+ tracker->setMetric(_env._metrics.internalJoin);
document::Bucket destBucket = cmd.getBucket();
{
// Create empty bucket for target.
@@ -682,7 +682,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi:
_spi.join(spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())),
spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())),
spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())),
- context);
+ tracker->context());
if (checkForError(result, *tracker)) {
tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket())));
}
@@ -690,9 +690,9 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi:
}
MessageTracker::UP
-PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd)
+PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.recheckBucketInfo, _env._component.getClock());
+ tracker->setMetric(_env._metrics.recheckBucketInfo);
document::Bucket bucket(cmd.getBucket());
api::BucketInfo info(_env.getBucketInfo(bucket));
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
@@ -720,58 +720,58 @@ PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Context & context)
+PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker)
{
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
- return handleGet(static_cast<api::GetCommand&>(msg), context);
+ return handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
case api::MessageType::PUT_ID:
- return handlePut(static_cast<api::PutCommand&>(msg), context);
+ return handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker));
case api::MessageType::REMOVE_ID:
- return handleRemove(static_cast<api::RemoveCommand&>(msg), context);
+ return handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker));
case api::MessageType::UPDATE_ID:
- return handleUpdate(static_cast<api::UpdateCommand&>(msg), context);
+ return handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker));
case api::MessageType::REVERT_ID:
- return handleRevert(static_cast<api::RevertCommand&>(msg), context);
+ return handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), context);
+ return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), context);
+ return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
- return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), context);
+ return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
- return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), context);
+ return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker));
// Depends on iterators
case api::MessageType::STATBUCKET_ID:
- return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), context);
+ return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker));
case api::MessageType::REMOVELOCATION_ID:
- return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), context);
+ return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker));
case api::MessageType::MERGEBUCKET_ID:
- return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), context);
+ return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker));
case api::MessageType::GETBUCKETDIFF_ID:
- return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), context);
+ return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::APPLYBUCKETDIFF_ID:
- return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), context);
+ return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::SETBUCKETSTATE_ID:
- return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg));
+ return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
- return handleGetIter(static_cast<GetIterCommand&>(msg), context);
+ return handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker));
case CreateIteratorCommand::ID:
- return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), context);
+ return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker));
case ReadBucketList::ID:
- return handleReadBucketList(static_cast<ReadBucketList&>(msg));
+ return handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker));
case ReadBucketInfo::ID:
- return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg));
+ return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
case RepairBucketCommand::ID:
- return handleRepairBucket(static_cast<RepairBucketCommand&>(msg));
+ return handleRepairBucket(static_cast<RepairBucketCommand&>(msg), std::move(tracker));
case BucketDiskMoveCommand::ID:
- return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), context);
+ return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), std::move(tracker));
case InternalBucketJoinCommand::ID:
- return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), context);
+ return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker));
case RecheckBucketInfoCommand::ID:
- return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg));
+ return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
default:
LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str());
break;
@@ -782,21 +782,6 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Conte
return MessageTracker::UP();
}
-MessageTracker::UP
-PersistenceThread::handleCommand(api::StorageCommand& msg)
-{
- spi::Context context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel());
- MessageTracker::UP mtracker(handleCommandSplitByType(msg, context));
- if (mtracker && ! context.getTrace().getRoot().isEmpty()) {
- if (mtracker->hasReply()) {
- mtracker->getReply().getTrace().getRoot().addChild(context.getTrace().getRoot());
- } else {
- msg.getTrace().getRoot().addChild(context.getTrace().getRoot());
- }
- }
- return mtracker;
-}
-
void
PersistenceThread::handleReply(api::StorageReply& reply)
{
@@ -813,7 +798,7 @@ PersistenceThread::handleReply(api::StorageReply& reply)
}
MessageTracker::UP
-PersistenceThread::processMessage(api::StorageMessage& msg)
+PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker)
{
MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer");
@@ -829,13 +814,12 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
}
} else {
api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg);
-
try {
int64_t startTime(_component->getClock().getTimeInMillis().getTime());
LOG(debug, "Handling command: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- auto tracker(handleCommand(initiatingCommand));
+ tracker = handleCommandSplitByType(initiatingCommand, std::move(tracker));
if (!tracker) {
LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str());
} else {
@@ -867,47 +851,18 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
}
}
- return MessageTracker::UP();
-}
-
-namespace {
-
-
-bool isBatchable(api::MessageType::Id id)
-{
- return (id == api::MessageType::PUT_ID ||
- id == api::MessageType::REMOVE_ID ||
- id == api::MessageType::UPDATE_ID ||
- id == api::MessageType::REVERT_ID);
-}
-
-bool hasBucketInfo(api::MessageType::Id id)
-{
- return (isBatchable(id) ||
- (id == api::MessageType::REMOVELOCATION_ID ||
- id == api::MessageType::JOINBUCKETS_ID));
-}
-
+ return tracker;
}
void
-PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) {
- std::vector<MessageTracker::UP> trackers;
- document::Bucket bucket = lock.first->getBucket();
-
+PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) {
LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
api::StorageMessage & msg(*lock.second);
- std::unique_ptr<MessageTracker> tracker = processMessage(msg);
- if (tracker && tracker->hasReply()) {
- if (hasBucketInfo(msg.getType().getId())) {
- if (tracker->getReply().getResult().success()) {
- _env.setBucketInfo(*tracker, bucket);
- }
- }
- LOG(spam, "Sending reply up: %s %" PRIu64,
- tracker->getReply().toString().c_str(), tracker->getReply().getMsgId());
- _env._fileStorHandler.sendReply(std::move(*tracker).stealReplySP());
+ auto tracker = std::make_unique<MessageTracker>(_env, std::move(lock.first), std::move(lock.second));
+ tracker = processMessage(msg, std::move(tracker));
+ if (tracker) {
+ tracker->sendReply();
}
}
@@ -922,7 +877,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId));
if (lock.first) {
- processLockedMessage(lock);
+ processLockedMessage(std::move(lock));
}
vespalib::MonitorGuard flushMonitorGuard(_flushMonitor);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 56414835b7b..a3c8099f228 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -28,23 +28,23 @@ public:
void flush() override;
framework::Thread& getThread() override { return *_thread; }
- MessageTracker::UP handlePut(api::PutCommand& cmd, spi::Context & context);
- MessageTracker::UP handleRemove(api::RemoveCommand& cmd, spi::Context & context);
- MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, spi::Context & context);
- MessageTracker::UP handleGet(api::GetCommand& cmd, spi::Context & context);
- MessageTracker::UP handleRevert(api::RevertCommand& cmd, spi::Context & context);
- MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context);
- MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context);
- MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context);
- MessageTracker::UP handleGetIter(GetIterCommand& cmd, spi::Context & context);
- MessageTracker::UP handleReadBucketList(ReadBucketList& cmd);
- MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd);
- MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context);
- MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd);
- MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context);
- MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context);
- MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd);
- MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd);
+ MessageTracker::UP handlePut(api::PutCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker);
private:
uint32_t _stripeId;
@@ -67,23 +67,22 @@ private:
* an appropriate error and returns false iff the command does not validate
* OK. Returns true and does not touch the tracker otherwise.
*/
- bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const;
+ static bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker);
// Message handling functions
- MessageTracker::UP handleCommand(api::StorageCommand&);
- MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, spi::Context & context);
+ MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker);
void handleReply(api::StorageReply&);
- MessageTracker::UP processMessage(api::StorageMessage& msg);
- void processLockedMessage(FileStorHandler::LockedMessage & lock);
+ MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker);
+ void processLockedMessage(FileStorHandler::LockedMessage lock);
// Thread main loop
void run(framework::ThreadHandle&) override;
- bool checkForError(const spi::Result& response, MessageTracker& tracker);
+ static bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const;
friend class TestAndSetHelper;
- bool tasConditionExists(const api::TestAndSetCommand & cmd);
+ static bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch = false);
};
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 9c49dc96750..53679a1a364 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -14,22 +14,69 @@ namespace {
ost << "PersistenceUtil(" << p << ")";
return ost.str();
}
+
+ bool isBatchable(api::MessageType::Id id)
+ {
+ return (id == api::MessageType::PUT_ID ||
+ id == api::MessageType::REMOVE_ID ||
+ id == api::MessageType::UPDATE_ID ||
+ id == api::MessageType::REVERT_ID);
+ }
+
+ bool hasBucketInfo(api::MessageType::Id id)
+ {
+ return (isBatchable(id) ||
+ (id == api::MessageType::REMOVELOCATION_ID ||
+ id == api::MessageType::JOINBUCKETS_ID));
+ }
+
}
-MessageTracker::MessageTracker(FileStorThreadMetrics::Op& metric,
- framework::Clock& clock)
+MessageTracker::MessageTracker(PersistenceUtil & env,
+ FileStorHandler::BucketLockInterface::SP bucketLock,
+ api::StorageMessage::SP msg)
: _sendReply(true),
- _metric(metric),
+ _updateBucketInfo(hasBucketInfo(msg->getType().getId())),
+ _bucketLock(std::move(bucketLock)),
+ _msg(std::move(msg)),
+ _context(_msg->getLoadType(), _msg->getPriority(), _msg->getTrace().getLevel()),
+ _env(env),
+ _metric(nullptr),
_result(api::ReturnCode::OK),
- _timer(clock)
-{
- _metric.count.inc();
+ _timer(_env._component.getClock())
+{ }
+
+void
+MessageTracker::setMetric(FileStorThreadMetrics::Op& metric) {
+ metric.count.inc();
+ _metric = &metric;
}
MessageTracker::~MessageTracker()
{
if (_reply.get() && _reply->getResult().success()) {
- _metric.latency.addValue(_timer.getElapsedTimeAsDouble());
+ _metric->latency.addValue(_timer.getElapsedTimeAsDouble());
+ }
+}
+
+void
+MessageTracker::sendReply() {
+ if (hasReply()) {
+ if ( ! _context.getTrace().getRoot().isEmpty()) {
+ getReply().getTrace().getRoot().addChild(_context.getTrace().getRoot());
+ }
+ if (_updateBucketInfo) {
+ if (getReply().getResult().success()) {
+ _env.setBucketInfo(*this, _bucketLock->getBucket());
+ }
+ }
+ LOG(spam, "Sending reply up: %s %" PRIu64,
+ getReply().toString().c_str(), getReply().getMsgId());
+ _env._fileStorHandler.sendReply(std::move(_reply));
+ } else {
+ if ( ! _context.getTrace().getRoot().isEmpty()) {
+ _msg->getTrace().getRoot().addChild(_context.getTrace().getRoot());
+ }
}
}
@@ -53,7 +100,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
}
if (!_reply->getResult().success()) {
- _metric.failed.inc();
+ _metric->failed.inc();
LOGBP(debug, "Failed to handle command %s: %s",
cmd.toString().c_str(),
_result.toString().c_str());
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index e8e5f947814..c6cb943f0b9 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -13,14 +13,18 @@
namespace storage {
+class PersistenceUtil;
+
class MessageTracker : protected Types {
public:
typedef std::unique_ptr<MessageTracker> UP;
- MessageTracker(FileStorThreadMetrics::Op& metric, framework::Clock& clock);
+ MessageTracker(PersistenceUtil & env, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
~MessageTracker();
+ void setMetric(FileStorThreadMetrics::Op& metric);
+
/**
* Called by operation handlers to set reply if they need to send a
* non-default reply. They should call this function as soon as they create
@@ -57,23 +61,32 @@ public:
api::ReturnCode getResult() const { return _result; }
+ spi::Context & context() { return _context; }
+
+ void sendReply();
+
private:
- bool _sendReply;
- FileStorThreadMetrics::Op& _metric;
- api::StorageReply::SP _reply;
- api::ReturnCode _result;
- framework::MilliSecTimer _timer;
+ bool _sendReply;
+ bool _updateBucketInfo;
+ FileStorHandler::BucketLockInterface::SP _bucketLock;
+ api::StorageMessage::SP _msg;
+ spi::Context _context;
+ PersistenceUtil &_env;
+ FileStorThreadMetrics::Op *_metric;
+ api::StorageReply::SP _reply;
+ api::ReturnCode _result;
+ framework::MilliSecTimer _timer;
};
struct PersistenceUtil {
vespa::config::content::StorFilestorConfig _config;
- ServiceLayerComponentRegister& _compReg;
- ServiceLayerComponent _component;
- FileStorHandler& _fileStorHandler;
- uint16_t _partition;
- uint16_t _nodeIndex;
- FileStorThreadMetrics& _metrics;
- const document::BucketIdFactory& _bucketFactory;
+ ServiceLayerComponentRegister &_compReg;
+ ServiceLayerComponent _component;
+ FileStorHandler &_fileStorHandler;
+ uint16_t _partition;
+ uint16_t _nodeIndex;
+ FileStorThreadMetrics &_metrics;
+ const document::BucketIdFactory &_bucketFactory;
const std::shared_ptr<const document::DocumentTypeRepo> _repo;
spi::PersistenceProvider& _spi;
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index f37c6723933..4829bdf4581 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -80,25 +80,18 @@ public:
}
MessageTracker::UP
-ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd,
- spi::Context& context)
+ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(
- _env._metrics.removeLocation[cmd.getLoadType()],
- _env._component.getClock());
+ tracker->setMetric(_env._metrics.removeLocation[cmd.getLoadType()]);
LOG(debug, "RemoveLocation(%s): using selection '%s'",
cmd.getBucketId().toString().c_str(),
cmd.getDocumentSelection().c_str());
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- UnrevertableRemoveEntryProcessor processor(_spi, bucket, context);
- BucketProcessor::iterateAll(_spi,
- bucket,
- cmd.getDocumentSelection(),
- processor,
- spi::NEWEST_DOCUMENT_ONLY,
- context);
+ UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context());
+ BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
+ processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context());
tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed));
@@ -106,12 +99,9 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd,
}
MessageTracker::UP
-ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd,
- spi::Context& context)
+ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(
- _env._metrics.statBucket[cmd.getLoadType()],
- _env._component.getClock());
+ tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]);
std::ostringstream ost;
ost << "Persistence bucket " << cmd.getBucketId()
@@ -119,15 +109,10 @@ ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd,
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
StatEntryProcessor processor(ost);
- BucketProcessor::iterateAll(_spi,
- bucket,
- cmd.getDocumentSelection(),
- processor,
- spi::ALL_VERSIONS,
- context);
-
- api::StatBucketReply::UP reply(new api::StatBucketReply(cmd, ost.str()));
- tracker->setReply(api::StorageReply::SP(reply.release()));
+ BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
+ processor, spi::ALL_VERSIONS,tracker->context());
+
+ tracker->setReply(std::make_shared<api::StatBucketReply>(cmd, ost.str()));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h
index 37b46ffc728..87c3c63b8fe 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.h
+++ b/storage/src/vespa/storage/persistence/processallhandler.h
@@ -8,11 +8,7 @@
#include <vespa/storageapi/message/stat.h>
#include <vespa/persistence/spi/persistenceprovider.h>
-namespace document {
-namespace select {
-class Node;
-}
-}
+namespace document::select { class Node; }
namespace storage {
@@ -20,9 +16,8 @@ class ProcessAllHandler : public Types {
public:
ProcessAllHandler(PersistenceUtil&, spi::PersistenceProvider&);
- MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&,
- spi::Context&);
- MessageTracker::UP handleStatBucket(api::StatBucketCommand&, spi::Context&);
+ MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, MessageTracker::UP tracker);
+ MessageTracker::UP handleStatBucket(api::StatBucketCommand&, MessageTracker::UP tracker);
protected:
PersistenceUtil& _env;
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 76c420e76e6..ed3de5c7873 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -74,12 +74,9 @@ ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const
}
spi::Result
-ProviderErrorWrapper::put(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const spi::DocumentSP& doc,
- spi::Context& context)
+ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::DocumentSP doc, spi::Context& context)
{
- return checkResult(_impl.put(bucket, ts, doc, context));
+ return checkResult(_impl.put(bucket, ts, std::move(doc), context));
}
spi::RemoveResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 292eb004223..61664419c69 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -47,7 +47,7 @@ public:
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override;
+ spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;