aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-16 10:47:21 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-16 10:47:21 +0000
commitfdc2e3e8f764974255ebb72d9a4481be6f9e160c (patch)
tree3f5ca6ff7c8bb186aa7abcca8aa7a0cb3f597624 /storage
parent1e7398e49cada7ab43b79752d8fba9a1b72344a1 (diff)
Factor out a handler for async operations to decouple code.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp36
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp24
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp206
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h33
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h26
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp212
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h14
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h10
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp18
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.h23
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.h32
-rw-r--r--storage/src/vespa/storage/persistence/types.h3
16 files changed, 377 insertions, 318 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 565d8b5ee3c..40b0d8eb2ba 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -199,7 +199,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
TEST_F(MergeHandlerTest, merge_bucket_command) {
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -224,7 +224,7 @@ void
MergeHandlerTest::testGetBucketDiffChain(bool midChain)
{
setUpChain(midChain ? MIDDLE : BACK);
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
LOG(debug, "Verifying that get bucket diff is sent on");
auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
@@ -273,7 +273,7 @@ void
MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
{
setUpChain(midChain ? MIDDLE : BACK);
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
LOG(debug, "Verifying that apply bucket diff is sent on");
auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
@@ -320,7 +320,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
TEST_F(MergeHandlerTest, master_message_flow) {
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -421,7 +421,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
doPut(1234, spi::Timestamp(4000 + i), docSize, docSize);
}
- MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
+ MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize);
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -504,7 +504,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize);
applyBucketDiffCmd->getDiff() = applyDiff;
- MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
+ MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize);
handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
@@ -516,7 +516,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
TEST_F(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -624,7 +624,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset,
TEST_F(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(providerWrapper, getEnv());
+ MergeHandler handler(getEnv(), providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -644,7 +644,7 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
}
TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -652,7 +652,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
}
TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -675,7 +675,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
}
TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
_nodes.clear();
_nodes.emplace_back(0, false);
_nodes.emplace_back(1, false);
@@ -707,7 +707,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
}
TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
api::ApplyBucketDiffCommand::Entry e;
@@ -815,7 +815,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(providerWrapper, getEnv());
+ MergeHandler handler(getEnv(), providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -847,7 +847,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(providerWrapper, getEnv());
+ MergeHandler handler(getEnv(), providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -880,7 +880,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(providerWrapper, getEnv());
+ MergeHandler handler(getEnv(), providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -945,7 +945,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(providerWrapper, getEnv());
+ MergeHandler handler(getEnv(), providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
HandleGetBucketDiffReplyInvoker invoker;
@@ -1036,7 +1036,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
ChainPos pos(i == 0 ? FRONT : MIDDLE);
setUpChain(pos);
invoker.setChainPos(pos);
- MergeHandler handler(providerWrapper, getEnv());
+ MergeHandler handler(getEnv(), providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -1128,7 +1128,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
spi::Timestamp ts(10111);
doPut(doc, ts);
- MergeHandler handler(getPersistenceProvider(), getEnv());
+ MergeHandler handler(getEnv(), getPersistenceProvider());
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
api::ApplyBucketDiffCommand::Entry e;
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index ce041660a2f..e4c5227f951 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -86,7 +86,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) {
auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
setTestCondition(*putTwo);
- ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
}
@@ -106,7 +106,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) {
auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
setTestCondition(*putTwo);
- ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -126,7 +126,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) {
auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
setTestCondition(*remove);
- ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -146,7 +146,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) {
auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
setTestCondition(*remove);
- ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY),
dumpBucket(BUCKET_ID));
@@ -172,7 +172,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -185,7 +185,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -197,7 +197,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -206,7 +206,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
@@ -218,7 +218,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) {
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
put->setCondition(documentapi::TestAndSetCondition("bjarne"));
- ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -228,9 +228,9 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) {
api::Timestamp timestamp = 0;
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
setTestCondition(*put);
- thread->handlePut(*put, createTracker(put, BUCKET));
+ thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET));
- ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(),
+ ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -274,7 +274,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta
}
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
- fetchResult(thread->handlePut(*put, createTracker(put, BUCKET)));
+ fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET)));
}
void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value)
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 6b57477fa23..15cd0540338 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_spersistence OBJECT
SOURCES
+ asynchandler.cpp
bucketownershipnotifier.cpp
bucketprocessor.cpp
fieldvisitor.cpp
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
new file mode 100644
index 00000000000..418eeb40ffc
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -0,0 +1,206 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "asynchandler.h"
+#include "persistenceutil.h"
+#include "testandsethelper.h"
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+
+namespace storage {
+
+namespace {
+
+class ResultTask : public vespalib::Executor::Task {
+public:
+ ResultTask() : _result(), _resultHandler(nullptr) {}
+
+ void setResult(spi::Result::UP result) {
+ _result = std::move(result);
+ }
+
+ void addResultHandler(const spi::ResultHandler *resultHandler) {
+ // Only handles a signal handler now,
+ // Can be extended if necessary later on
+ assert(_resultHandler == nullptr);
+ _resultHandler = resultHandler;
+ }
+
+ void handle(const spi::Result &result) {
+ if (_resultHandler != nullptr) {
+ _resultHandler->handle(result);
+ }
+ }
+
+protected:
+ spi::Result::UP _result;
+private:
+ const spi::ResultHandler *_resultHandler;
+};
+
+template<class FunctionType>
+class LambdaResultTask : public ResultTask {
+public:
+ explicit LambdaResultTask(FunctionType &&func)
+ : _func(std::move(func))
+ {}
+
+ ~LambdaResultTask() override = default;
+
+ void run() override {
+ handle(*_result);
+ _func(std::move(_result));
+ }
+
+private:
+ FunctionType _func;
+};
+
+template<class FunctionType>
+std::unique_ptr<ResultTask>
+makeResultTask(FunctionType &&function) {
+ return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+}
+
+class ResultTaskOperationDone : public spi::OperationComplete {
+public:
+ ResultTaskOperationDone(vespalib::ISequencedTaskExecutor & executor, document::BucketId bucketId,
+ std::unique_ptr<ResultTask> task)
+ : _executor(executor),
+ _task(std::move(task)),
+ _executorId(executor.getExecutorId(bucketId.getId()))
+ {
+ }
+ void onComplete(spi::Result::UP result) override {
+ _task->setResult(std::move(result));
+ _executor.executeTask(_executorId, std::move(_task));
+ }
+ void addResultHandler(const spi::ResultHandler * resultHandler) override {
+ _task->addResultHandler(resultHandler);
+ }
+private:
+ vespalib::ISequencedTaskExecutor & _executor;
+ std::unique_ptr<ResultTask> _task;
+ vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
+};
+
+}
+AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
+ vespalib::ISequencedTaskExecutor & executor)
+ : _env(env),
+ _spi(spi),
+ _sequencedExecutor(executor)
+{}
+
+MessageTracker::UP
+AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) const
+{
+ MessageTracker & tracker = *trackerUP;
+ auto& metrics = _env._metrics.put[cmd.getLoadType()];
+ tracker.setMetric(metrics);
+ metrics.request_size.addValue(cmd.getApproxByteSize());
+
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) {
+ // Will also count condition parse failures etc as TaS failures, but
+ // those results _will_ increase the error metrics as well.
+ metrics.test_and_set_failed.inc();
+ return trackerUP;
+ }
+
+ spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket());
+ auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->sendReply();
+ });
+ _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+
+ return trackerUP;
+}
+
+MessageTracker::UP
+AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const
+{
+ MessageTracker & tracker = *trackerUP;
+ auto& metrics = _env._metrics.update[cmd.getLoadType()];
+ tracker.setMetric(metrics);
+ metrics.request_size.addValue(cmd.getApproxByteSize());
+
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) {
+ metrics.test_and_set_failed.inc();
+ return trackerUP;
+ }
+
+ spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket());
+
+ // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
+ auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
+ reply->setOldTimestamp(response.getExistingTimestamp());
+ tracker->setReply(std::move(reply));
+ }
+ tracker->sendReply();
+ });
+ _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return trackerUP;
+}
+
+MessageTracker::UP
+AsyncHandler::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP) const
+{
+ MessageTracker & tracker = *trackerUP;
+ auto& metrics = _env._metrics.remove[cmd.getLoadType()];
+ tracker.setMetric(metrics);
+ metrics.request_size.addValue(cmd.getApproxByteSize());
+
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) {
+ metrics.test_and_set_failed.inc();
+ return trackerUP;
+ }
+
+ spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket());
+
+ // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
+ auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
+ }
+ if (!response.wasFound()) {
+ metrics.notFound.inc();
+ }
+ tracker->sendReply();
+ });
+ _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return trackerUP;
+}
+
+bool
+AsyncHandler::tasConditionExists(const api::TestAndSetCommand & cmd) {
+ return cmd.getCondition().isPresent();
+}
+
+bool
+AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
+ spi::Context & context, bool missingDocumentImpliesMatch) const {
+ try {
+ TestAndSetHelper helper(_env, _spi, cmd, missingDocumentImpliesMatch);
+
+ auto code = helper.retrieveAndMatch(context);
+ if (code.failed()) {
+ tracker.fail(code.getResult(), code.getMessage());
+ return false;
+ }
+ } catch (const TestAndSetException & e) {
+ auto code = e.getCode();
+ tracker.fail(code.getResult(), code.getMessage());
+ return false;
+ }
+
+ return true;
+}
+}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
new file mode 100644
index 00000000000..4c525b10152
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -0,0 +1,33 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "types.h"
+#include <vespa/storageapi/message/persistence.h>
+
+namespace vespalib { class ISequencedTaskExecutor; }
+namespace storage {
+
+namespace spi {
+ struct PersistenceProvider;
+ class Context;
+}
+struct PersistenceUtil;
+
+class AsyncHandler : public Types {
+
+public:
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor);
+ MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
+private:
+ static bool tasConditionExists(const api::TestAndSetCommand & cmd);
+ bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
+ spi::Context & context, bool missingDocumentImpliesMatch = false) const;
+ const PersistenceUtil & _env;
+ spi::PersistenceProvider & _spi;
+ vespalib::ISequencedTaskExecutor & _sequencedExecutor;
+};
+
+} // storage
+
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index acbbe782099..ce033bf6e19 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "mergehandler.h"
+#include "persistenceutil.h"
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
@@ -13,16 +14,16 @@ LOG_SETUP(".persistence.mergehandler");
namespace storage {
-MergeHandler::MergeHandler(spi::PersistenceProvider& spi, PersistenceUtil& env)
- : _spi(spi),
- _env(env),
+MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi)
+ : _env(env),
+ _spi(spi),
_maxChunkSize(env._config.bucketMergeChunkSize)
{
}
-MergeHandler::MergeHandler(spi::PersistenceProvider& spi, PersistenceUtil& env, uint32_t maxChunkSize)
- : _spi(spi),
- _env(env),
+MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, uint32_t maxChunkSize)
+ : _env(env),
+ _spi(spi),
_maxChunkSize(maxChunkSize)
{
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 7052258ec03..9e200f62307 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -13,14 +13,18 @@
*/
#pragma once
-#include <vespa/persistence/spi/persistenceprovider.h>
+#include "types.h"
+#include <vespa/persistence/spi/bucket.h>
+#include <vespa/persistence/spi/docentry.h>
#include <vespa/storage/persistence/filestorage/mergestatus.h>
-#include <vespa/storage/persistence/persistenceutil.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storage/common/messagesender.h>
namespace storage {
+namespace spi { struct PersistenceProvider; }
+struct PersistenceUtil;
+
class MergeHandler : public Types {
public:
@@ -30,11 +34,9 @@ public:
DELETED_IN_PLACE = 0x04
};
- MergeHandler(spi::PersistenceProvider& spi, PersistenceUtil&);
+ MergeHandler(PersistenceUtil&, spi::PersistenceProvider& spi);
/** Used for unit testing */
- MergeHandler(spi::PersistenceProvider& spi,
- PersistenceUtil& env,
- uint32_t maxChunkSize);
+ MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, uint32_t maxChunkSize);
bool buildBucketInfoList(
const spi::Bucket& bucket,
@@ -55,16 +57,16 @@ public:
uint8_t nodeIndex,
spi::Context& context);
- MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, MessageTracker::UP);
- MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTracker::UP);
+ MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP);
+ MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP);
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&);
- MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTracker::UP);
+ MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP);
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&);
private:
- spi::PersistenceProvider& _spi;
- PersistenceUtil& _env;
- uint32_t _maxChunkSize;
+ PersistenceUtil &_env;
+ spi::PersistenceProvider &_spi;
+ uint32_t _maxChunkSize;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 98270561a33..b6ce91c7ddb 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -7,7 +7,6 @@
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
-#include <vespa/document/update/documentupdate.h>
#include <vespa/document/base/exceptions.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
@@ -24,77 +23,9 @@ namespace storage {
namespace {
-class ResultTask : public vespalib::Executor::Task {
-public:
- ResultTask() : _result(), _resultHandler(nullptr) { }
- void setResult(spi::Result::UP result) {
- _result = std::move(result);
- }
- void addResultHandler(const spi::ResultHandler * resultHandler) {
- // Only handles a signal handler now,
- // Can be extended if necessary later on
- assert(_resultHandler == nullptr);
- _resultHandler = resultHandler;
- }
- void handle(const spi::Result &result ) {
- if (_resultHandler != nullptr) {
- _resultHandler->handle(result);
- }
- }
-protected:
- spi::Result::UP _result;
-private:
- const spi::ResultHandler * _resultHandler;
-};
-
-template<class FunctionType>
-class LambdaResultTask : public ResultTask {
-public:
- explicit LambdaResultTask(FunctionType && func)
- : _func(std::move(func))
- {}
- ~LambdaResultTask() override = default;
- void run() override {
- handle(*_result);
- _func(std::move(_result));
- }
-private:
- FunctionType _func;
-};
-
-template <class FunctionType>
-std::unique_ptr<ResultTask>
-makeResultTask(FunctionType &&function)
-{
- return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>>
- (std::forward<FunctionType>(function));
-}
-
-class ResultTaskOperationDone : public spi::OperationComplete {
-public:
- ResultTaskOperationDone(vespalib::ISequencedTaskExecutor & executor, document::BucketId bucketId,
- std::unique_ptr<ResultTask> task)
- : _executor(executor),
- _task(std::move(task)),
- _executorId(executor.getExecutorId(bucketId.getId()))
- {
- }
- void onComplete(spi::Result::UP result) override {
- _task->setResult(std::move(result));
- _executor.executeTask(_executorId, std::move(_task));
- }
- void addResultHandler(const spi::ResultHandler * resultHandler) override {
- _task->addResultHandler(resultHandler);
- }
-private:
- vespalib::ISequencedTaskExecutor & _executor;
- std::unique_ptr<ResultTask> _task;
- vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
-};
-
vespalib::string
createThreadName(size_t stripeId) {
- return vespalib::make_string("Thread %zu", stripeId);
+ return vespalib::make_string("PersistenceThread-%zu", stripeId);
}
}
@@ -108,10 +39,10 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence
: _stripeId(filestorHandler.getNextStripeId()),
_component(std::make_unique<ServiceLayerComponent>(compReg, createThreadName(_stripeId))),
_env(configUri, *_component, filestorHandler, metrics, provider),
- _sequencedExecutor(sequencedExecutor),
_spi(provider),
_processAllHandler(_env, provider),
- _mergeHandler(_spi, _env),
+ _mergeHandler(_env, _spi),
+ _asyncHandler(_env, _spi, sequencedExecutor),
_bucketOwnershipNotifier()
{
_bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler);
@@ -127,133 +58,6 @@ PersistenceThread::~PersistenceThread()
LOG(debug, "Persistence thread done with destruction");
}
-spi::Bucket
-PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucket) const
-{
- BucketId docBucket(_env._bucketFactory.getBucketId(id));
- docBucket.setUsedBits(bucket.getBucketId().getUsedBits());
- if (bucket.getBucketId() != docBucket) {
- docBucket = _env._bucketFactory.getBucketId(id);
- throw vespalib::IllegalStateException("Document " + id.toString()
- + " (bucket " + docBucket.toString() + ") does not belong in "
- + "bucket " + bucket.getBucketId().toString() + ".", VESPA_STRLOC);
- }
-
- return spi::Bucket(bucket);
-}
-
-bool
-PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) {
- return cmd.getCondition().isPresent();
-}
-
-bool
-PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
- spi::Context & context, bool missingDocumentImpliesMatch) {
- try {
- TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch);
-
- auto code = helper.retrieveAndMatch(context);
- if (code.failed()) {
- tracker.fail(code.getResult(), code.getMessage());
- return false;
- }
- } catch (const TestAndSetException & e) {
- auto code = e.getCode();
- tracker.fail(code.getResult(), code.getMessage());
- return false;
- }
-
- return true;
-}
-
-MessageTracker::UP
-PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP)
-{
- MessageTracker & tracker = *trackerUP;
- auto& metrics = _env._metrics.put[cmd.getLoadType()];
- tracker.setMetric(metrics);
- metrics.request_size.addValue(cmd.getApproxByteSize());
-
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) {
- // Will also count condition parse failures etc as TaS failures, but
- // those results _will_ increase the error metrics as well.
- metrics.test_and_set_failed.inc();
- return trackerUP;
- }
-
- spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
- auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
- tracker->checkForError(*response);
- tracker->sendReply();
- });
- _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
-
- return trackerUP;
-}
-
-MessageTracker::UP
-PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP)
-{
- MessageTracker & tracker = *trackerUP;
- auto& metrics = _env._metrics.remove[cmd.getLoadType()];
- tracker.setMetric(metrics);
- metrics.request_size.addValue(cmd.getApproxByteSize());
-
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) {
- metrics.test_and_set_failed.inc();
- return trackerUP;
- }
-
- spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
-
- // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
- auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
- auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
- if (tracker->checkForError(response)) {
- tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- }
- if (!response.wasFound()) {
- metrics.notFound.inc();
- }
- tracker->sendReply();
- });
- _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
- return trackerUP;
-}
-
-MessageTracker::UP
-PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP)
-{
- MessageTracker & tracker = *trackerUP;
- auto& metrics = _env._metrics.update[cmd.getLoadType()];
- tracker.setMetric(metrics);
- metrics.request_size.addValue(cmd.getApproxByteSize());
-
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) {
- metrics.test_and_set_failed.inc();
- return trackerUP;
- }
-
- spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
-
- // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
- auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
- auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP);
- if (tracker->checkForError(response)) {
- auto reply = std::make_shared<api::UpdateReply>(cmd);
- reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(std::move(reply));
- }
- tracker->sendReply();
- });
- _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
- return trackerUP;
-}
-
namespace {
spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept {
@@ -291,8 +95,8 @@ PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker)
if ( ! fieldSet) { return tracker; }
tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
- spi::GetResult result =
- _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), tracker->context());
+ spi::GetResult result = _spi.get(_env.getBucket(cmd.getDocumentId(), cmd.getBucket()),
+ *fieldSet, cmd.getDocumentId(), tracker->context());
if (tracker->checkForError(result)) {
if (!result.hasDocument() && (document::FieldSet::Type::NONE != fieldSet->getType())) {
@@ -801,11 +605,11 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra
case api::MessageType::GET_ID:
return handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
case api::MessageType::PUT_ID:
- return handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker));
case api::MessageType::REMOVE_ID:
- return handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker));
case api::MessageType::UPDATE_ID:
- return handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker));
case api::MessageType::REVERT_ID:
return handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 55e1fdc9ae6..2a3ff813c7e 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -5,6 +5,7 @@
#include "diskthread.h"
#include "processallhandler.h"
#include "mergehandler.h"
+#include "asynchandler.h"
#include "persistenceutil.h"
#include "provider_error_wrapper.h"
#include <vespa/storage/common/bucketmessages.h>
@@ -15,7 +16,6 @@
namespace storage {
class BucketOwnershipNotifier;
-class TestAndSetHelper;
class PersistenceThread final : public DiskThread, public Types
{
@@ -29,9 +29,6 @@ public:
void flush() override;
framework::Thread& getThread() override { return *_thread; }
- MessageTracker::UP handlePut(api::PutCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker);
MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker);
MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker);
MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker);
@@ -46,14 +43,15 @@ public:
MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker);
MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker);
+ const AsyncHandler & asyncHandler() const { return _asyncHandler; }
private:
uint32_t _stripeId;
ServiceLayerComponent::UP _component;
PersistenceUtil _env;
- vespalib::ISequencedTaskExecutor & _sequencedExecutor;
spi::PersistenceProvider& _spi;
ProcessAllHandler _processAllHandler;
MergeHandler _mergeHandler;
+ AsyncHandler _asyncHandler;
framework::Thread::UP _thread;
std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
@@ -75,12 +73,6 @@ private:
// Thread main loop
void run(framework::ThreadHandle&) override;
- spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const;
-
- friend class TestAndSetHelper;
- static bool tasConditionExists(const api::TestAndSetCommand & cmd);
- bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
- spi::Context & context, bool missingDocumentImpliesMatch = false);
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 083b15abe13..1f1eb680b91 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -3,6 +3,7 @@
#include "persistenceutil.h"
#include <vespa/config/config.h>
#include <vespa/config/helper/configgetter.hpp>
+#include <vespa/vespalib/util/exceptions.h>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".persistence.util");
@@ -285,4 +286,19 @@ PersistenceUtil::shutdown(const std::string& reason)
_component.requestShutdown(reason);
}
+spi::Bucket
+PersistenceUtil::getBucket(const document::DocumentId& id, const document::Bucket &bucket) const
+{
+ document::BucketId docBucket(_bucketFactory.getBucketId(id));
+ docBucket.setUsedBits(bucket.getBucketId().getUsedBits());
+ if (bucket.getBucketId() != docBucket) {
+ docBucket = _bucketFactory.getBucketId(id);
+ throw vespalib::IllegalStateException("Document " + id.toString()
+ + " (bucket " + docBucket.toString() + ") does not belong in "
+ + "bucket " + bucket.getBucketId().toString() + ".", VESPA_STRLOC);
+ }
+
+ return spi::Bucket(bucket);
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index f2da4256ea0..bceaf544a9e 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -113,11 +113,11 @@ struct PersistenceUtil {
~PersistenceUtil();
- StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace)
- { return _component.getBucketDatabase(bucketSpace); }
+ StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) {
+ return _component.getBucketDatabase(bucketSpace);
+ }
- void updateBucketDatabase(const document::Bucket &bucket,
- const api::BucketInfo& info);
+ void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info);
uint16_t getPreferredAvailableDisk(const document::Bucket &bucket) const;
@@ -139,6 +139,8 @@ struct PersistenceUtil {
void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket);
+ spi::Bucket getBucket(const document::DocumentId& id, const document::Bucket &bucket) const;
+
static uint32_t convertErrorCode(const spi::Result& response);
void shutdown(const std::string& reason);
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index 15f7c1fffb7..0a1141a9ab3 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -2,6 +2,9 @@
#include "processallhandler.h"
#include "bucketprocessor.h"
+#include "persistenceutil.h"
+#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/log/log.h>
@@ -9,8 +12,7 @@ LOG_SETUP(".persistence.processall");
namespace storage {
-ProcessAllHandler::ProcessAllHandler(PersistenceUtil& env,
- spi::PersistenceProvider& spi)
+ProcessAllHandler::ProcessAllHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi)
: _env(env),
_spi(spi)
{
@@ -36,17 +38,9 @@ public:
{}
void process(spi::DocEntry& entry) override {
- spi::RemoveResult removeResult = _provider.remove(
- _bucket,
- entry.getTimestamp(),
- *entry.getDocumentId(),
- _context);
-
+ spi::RemoveResult removeResult = _provider.remove(_bucket, entry.getTimestamp(), *entry.getDocumentId(),_context);
if (removeResult.getErrorCode() != spi::Result::ErrorType::NONE) {
- std::ostringstream ss;
- ss << "Failed to do remove for removelocation: "
- << removeResult.getErrorMessage();
- throw std::runtime_error(ss.str());
+ throw std::runtime_error(vespalib::make_string("Failed to do remove for removelocation: %s", removeResult.getErrorMessage().c_str()));
}
++_n_removed;
}
diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h
index 87c3c63b8fe..9c0f8905744 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.h
+++ b/storage/src/vespa/storage/persistence/processallhandler.h
@@ -1,26 +1,23 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/storage/persistence/messages.h>
-#include <vespa/storage/persistence/persistenceutil.h>
-#include <vespa/storageapi/message/persistence.h>
+#include "types.h"
+#include "messages.h"
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
-#include <vespa/persistence/spi/persistenceprovider.h>
-
-namespace document::select { class Node; }
namespace storage {
-class ProcessAllHandler : public Types {
+namespace spi { struct PersistenceProvider; }
+struct PersistenceUtil;
+class ProcessAllHandler : public Types {
public:
- ProcessAllHandler(PersistenceUtil&, spi::PersistenceProvider&);
- MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, MessageTracker::UP tracker);
- MessageTracker::UP handleStatBucket(api::StatBucketCommand&, MessageTracker::UP tracker);
-
-protected:
- PersistenceUtil& _env;
+ ProcessAllHandler(const PersistenceUtil&, spi::PersistenceProvider&);
+ MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker);
+ MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker);
+private:
+ const PersistenceUtil& _env;
spi::PersistenceProvider& _spi;
};
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp
index 523f5a52885..f222325053c 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.cpp
+++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp
@@ -1,8 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
// @author Vegard Sjonfjell
-#include "fieldvisitor.h"
#include "testandsethelper.h"
+#include "persistenceutil.h"
+#include "fieldvisitor.h"
#include <vespa/document/select/parser.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/vespalib/util/stringfmt.h>
@@ -24,7 +25,7 @@ void TestAndSetHelper::resolveDocumentType(const document::DocumentTypeRepo & do
}
void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo) {
- document::select::Parser parser(documentTypeRepo, _component.getBucketIdFactory());
+ document::select::Parser parser(documentTypeRepo, _env._bucketFactory);
try {
_docSelectionUp = parser.parse(_cmd.getCondition().getSelection());
@@ -34,25 +35,20 @@ void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo &
}
spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context) {
- return _thread._spi.get(
- _thread.getBucket(_docId, _cmd.getBucket()),
- fieldSet,
- _cmd.getDocumentId(),
- context);
+ return _spi.get(_env.getBucket(_docId, _cmd.getBucket()), fieldSet,_cmd.getDocumentId(),context);
}
-TestAndSetHelper::TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd,
- bool missingDocumentImpliesMatch)
- : _thread(thread),
- _component(thread._env._component),
+TestAndSetHelper::TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & spi,
+ const api::TestAndSetCommand & cmd, bool missingDocumentImpliesMatch)
+ : _env(env),
+ _spi(spi),
_cmd(cmd),
_docId(cmd.getDocumentId()),
_docTypePtr(_cmd.getDocumentType()),
_missingDocumentImpliesMatch(missingDocumentImpliesMatch)
{
- auto docTypeRepo = _component.getTypeRepo()->documentTypeRepo;
- resolveDocumentType(*docTypeRepo);
- parseDocumentSelection(*docTypeRepo);
+ resolveDocumentType(*env._repo);
+ parseDocumentSelection(*env._repo);
}
TestAndSetHelper::~TestAndSetHelper() = default;
@@ -72,7 +68,7 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) {
if (_docSelectionUp->contains(*docPtr) != document::select::Result::True) {
return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
vespalib::make_string("Condition did not match document nodeIndex=%d bucket=%" PRIx64 " %s",
- _thread._env._nodeIndex, _cmd.getBucketId().getRawId(),
+ _env._nodeIndex, _cmd.getBucketId().getRawId(),
_cmd.hasBeenRemapped() ? "remapped" : ""));
}
@@ -84,7 +80,7 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) {
return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
vespalib::make_string("Document does not exist nodeIndex=%d bucket=%" PRIx64 " %s",
- _thread._env._nodeIndex, _cmd.getBucketId().getRawId(),
+ _env._nodeIndex, _cmd.getBucketId().getRawId(),
_cmd.hasBeenRemapped() ? "remapped" : ""));
}
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.h b/storage/src/vespa/storage/persistence/testandsethelper.h
index f2c55d5ba11..ca4e452c621 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.h
+++ b/storage/src/vespa/storage/persistence/testandsethelper.h
@@ -2,10 +2,23 @@
// @author Vegard Sjonfjell
#pragma once
-#include <vespa/storage/persistence/persistencethread.h>
+
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/persistence/spi/result.h>
+
+namespace document::select { class Node; }
+namespace document { class FieldSet; }
namespace storage {
+namespace spi {
+ class Context;
+ class PersistenceProvider;
+}
+class PersistenceThread;
+class ServiceLayerComponent;
+class PersistenceUtil;
+
class TestAndSetException : public std::runtime_error {
api::ReturnCode _code;
@@ -19,22 +32,21 @@ public:
};
class TestAndSetHelper {
- PersistenceThread & _thread;
- ServiceLayerComponent & _component;
- const api::TestAndSetCommand & _cmd;
-
- const document::DocumentId _docId;
- const document::DocumentType * _docTypePtr;
+ const PersistenceUtil &_env;
+ const spi::PersistenceProvider &_spi;
+ const api::TestAndSetCommand &_cmd;
+ const document::DocumentId _docId;
+ const document::DocumentType * _docTypePtr;
std::unique_ptr<document::select::Node> _docSelectionUp;
- bool _missingDocumentImpliesMatch;
+ bool _missingDocumentImpliesMatch;
void resolveDocumentType(const document::DocumentTypeRepo & documentTypeRepo);
void parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo);
spi::GetResult retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context);
public:
- TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd,
- bool missingDocumentImpliesMatch = false);
+ TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & _spi,
+ const api::TestAndSetCommand & cmd, bool missingDocumentImpliesMatch = false);
~TestAndSetHelper();
api::ReturnCode retrieveAndMatch(spi::Context & context);
};
diff --git a/storage/src/vespa/storage/persistence/types.h b/storage/src/vespa/storage/persistence/types.h
index 1616395dd32..6a73adc8b0c 100644
--- a/storage/src/vespa/storage/persistence/types.h
+++ b/storage/src/vespa/storage/persistence/types.h
@@ -14,6 +14,8 @@
namespace storage {
+class MessageTracker;
+
struct Types {
typedef document::BucketId BucketId;
typedef document::Document Document;
@@ -25,6 +27,7 @@ struct Types {
typedef api::BucketInfo BucketInfo;
typedef api::ReturnCode ReturnCode;
typedef StorBucketDatabase::WrappedEntry BucketDBEntry;
+ using MessageTrackerUP = std::unique_ptr<MessageTracker>;
static const framework::MicroSecTime MAX_TIMESTAMP;
static const framework::MicroSecTime UNSET_TIMESTAMP;