diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-16 10:47:21 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-16 10:47:21 +0000 |
commit | fdc2e3e8f764974255ebb72d9a4481be6f9e160c (patch) | |
tree | 3f5ca6ff7c8bb186aa7abcca8aa7a0cb3f597624 /storage | |
parent | 1e7398e49cada7ab43b79752d8fba9a1b72344a1 (diff) |
Factor out a handler for async operations to decouple code.
Diffstat (limited to 'storage')
16 files changed, 377 insertions, 318 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 565d8b5ee3c..40b0d8eb2ba 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -199,7 +199,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. TEST_F(MergeHandlerTest, merge_bucket_command) { - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); LOG(debug, "Handle a merge bucket command"); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -224,7 +224,7 @@ void MergeHandlerTest::testGetBucketDiffChain(bool midChain) { setUpChain(midChain ? MIDDLE : BACK); - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); LOG(debug, "Verifying that get bucket diff is sent on"); auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); @@ -273,7 +273,7 @@ void MergeHandlerTest::testApplyBucketDiffChain(bool midChain) { setUpChain(midChain ? MIDDLE : BACK); - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); LOG(debug, "Verifying that apply bucket diff is sent on"); auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); @@ -320,7 +320,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. TEST_F(MergeHandlerTest, master_message_flow) { - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); LOG(debug, "Handle a merge bucket command"); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -421,7 +421,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { doPut(1234, spi::Timestamp(4000 + i), docSize, docSize); } - MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); + MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize); LOG(debug, "Handle a merge bucket command"); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -504,7 +504,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize); applyBucketDiffCmd->getDiff() = applyDiff; - MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); + MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize); handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); @@ -516,7 +516,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { TEST_F(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -624,7 +624,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, TEST_F(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(providerWrapper, getEnv()); + MergeHandler handler(getEnv(), providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -644,7 +644,7 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { } TEST_F(MergeHandlerTest, bucket_not_found_in_db) { - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -652,7 +652,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) { } TEST_F(MergeHandlerTest, merge_progress_safe_guard) { - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -675,7 +675,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { } TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); _nodes.clear(); _nodes.emplace_back(0, false); _nodes.emplace_back(1, false); @@ -707,7 +707,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { } TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { api::ApplyBucketDiffCommand::Entry e; @@ -815,7 +815,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(providerWrapper, getEnv()); + MergeHandler handler(getEnv(), providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); @@ -847,7 +847,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(providerWrapper, getEnv()); + MergeHandler handler(getEnv(), providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); @@ -880,7 +880,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(providerWrapper, getEnv()); + MergeHandler handler(getEnv(), providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); @@ -945,7 +945,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(providerWrapper, getEnv()); + MergeHandler handler(getEnv(), providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); HandleGetBucketDiffReplyInvoker invoker; @@ -1036,7 +1036,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { ChainPos pos(i == 0 ? FRONT : MIDDLE); setUpChain(pos); invoker.setChainPos(pos); - MergeHandler handler(providerWrapper, getEnv()); + MergeHandler handler(getEnv(), providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -1128,7 +1128,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { spi::Timestamp ts(10111); doPut(doc, ts); - MergeHandler handler(getPersistenceProvider(), getEnv()); + MergeHandler handler(getEnv(), getPersistenceProvider()); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { api::ApplyBucketDiffCommand::Entry e; diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index ce041660a2f..e4c5227f951 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -86,7 +86,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -106,7 +106,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -126,7 +126,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -146,7 +146,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); @@ -172,7 +172,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -185,7 +185,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -197,7 +197,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -206,7 +206,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } @@ -218,7 +218,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -228,9 +228,9 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { api::Timestamp timestamp = 0; auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); setTestCondition(*put); - thread->handlePut(*put, createTracker(put, BUCKET)); + thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -274,7 +274,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta } auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); - fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))); + fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 6b57477fa23..15cd0540338 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_spersistence OBJECT SOURCES + asynchandler.cpp bucketownershipnotifier.cpp bucketprocessor.cpp fieldvisitor.cpp diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp new file mode 100644 index 00000000000..418eeb40ffc --- /dev/null +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -0,0 +1,206 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "asynchandler.h" +#include "persistenceutil.h" +#include "testandsethelper.h" +#include <vespa/document/update/documentupdate.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> + +namespace storage { + +namespace { + +class ResultTask : public vespalib::Executor::Task { +public: + ResultTask() : _result(), _resultHandler(nullptr) {} + + void setResult(spi::Result::UP result) { + _result = std::move(result); + } + + void addResultHandler(const spi::ResultHandler *resultHandler) { + // Only handles a signal handler now, + // Can be extended if necessary later on + assert(_resultHandler == nullptr); + _resultHandler = resultHandler; + } + + void handle(const spi::Result &result) { + if (_resultHandler != nullptr) { + _resultHandler->handle(result); + } + } + +protected: + spi::Result::UP _result; +private: + const spi::ResultHandler *_resultHandler; +}; + +template<class FunctionType> +class LambdaResultTask : public ResultTask { +public: + explicit LambdaResultTask(FunctionType &&func) + : _func(std::move(func)) + {} + + ~LambdaResultTask() override = default; + + void run() override { + handle(*_result); + _func(std::move(_result)); + } + +private: + FunctionType _func; +}; + +template<class FunctionType> +std::unique_ptr<ResultTask> +makeResultTask(FunctionType &&function) { + return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); +} + +class ResultTaskOperationDone : public spi::OperationComplete { +public: + ResultTaskOperationDone(vespalib::ISequencedTaskExecutor & executor, document::BucketId bucketId, + std::unique_ptr<ResultTask> task) + : _executor(executor), + _task(std::move(task)), + _executorId(executor.getExecutorId(bucketId.getId())) + { + } + void onComplete(spi::Result::UP result) override { + _task->setResult(std::move(result)); + _executor.executeTask(_executorId, std::move(_task)); + } + void addResultHandler(const spi::ResultHandler * resultHandler) override { + _task->addResultHandler(resultHandler); + } +private: + vespalib::ISequencedTaskExecutor & _executor; + std::unique_ptr<ResultTask> _task; + vespalib::ISequencedTaskExecutor::ExecutorId _executorId; +}; + +} +AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, + vespalib::ISequencedTaskExecutor & executor) + : _env(env), + _spi(spi), + _sequencedExecutor(executor) +{} + +MessageTracker::UP +AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) const +{ + MessageTracker & tracker = *trackerUP; + auto& metrics = _env._metrics.put[cmd.getLoadType()]; + tracker.setMetric(metrics); + metrics.request_size.addValue(cmd.getApproxByteSize()); + + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) { + // Will also count condition parse failures etc as TaS failures, but + // those results _will_ increase the error metrics as well. + metrics.test_and_set_failed.inc(); + return trackerUP; + } + + spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket()); + auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->sendReply(); + }); + _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + + return trackerUP; +} + +MessageTracker::UP +AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const +{ + MessageTracker & tracker = *trackerUP; + auto& metrics = _env._metrics.update[cmd.getLoadType()]; + tracker.setMetric(metrics); + metrics.request_size.addValue(cmd.getApproxByteSize()); + + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) { + metrics.test_and_set_failed.inc(); + return trackerUP; + } + + spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket()); + + // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker + auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP); + if (tracker->checkForError(response)) { + auto reply = std::make_shared<api::UpdateReply>(cmd); + reply->setOldTimestamp(response.getExistingTimestamp()); + tracker->setReply(std::move(reply)); + } + tracker->sendReply(); + }); + _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return trackerUP; +} + +MessageTracker::UP +AsyncHandler::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP) const +{ + MessageTracker & tracker = *trackerUP; + auto& metrics = _env._metrics.remove[cmd.getLoadType()]; + tracker.setMetric(metrics); + metrics.request_size.addValue(cmd.getApproxByteSize()); + + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) { + metrics.test_and_set_failed.inc(); + return trackerUP; + } + + spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket()); + + // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker + auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP); + if (tracker->checkForError(response)) { + tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); + } + if (!response.wasFound()) { + metrics.notFound.inc(); + } + tracker->sendReply(); + }); + _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return trackerUP; +} + +bool +AsyncHandler::tasConditionExists(const api::TestAndSetCommand & cmd) { + return cmd.getCondition().isPresent(); +} + +bool +AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, + spi::Context & context, bool missingDocumentImpliesMatch) const { + try { + TestAndSetHelper helper(_env, _spi, cmd, missingDocumentImpliesMatch); + + auto code = helper.retrieveAndMatch(context); + if (code.failed()) { + tracker.fail(code.getResult(), code.getMessage()); + return false; + } + } catch (const TestAndSetException & e) { + auto code = e.getCode(); + tracker.fail(code.getResult(), code.getMessage()); + return false; + } + + return true; +} +} diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h new file mode 100644 index 00000000000..4c525b10152 --- /dev/null +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "types.h" +#include <vespa/storageapi/message/persistence.h> + +namespace vespalib { class ISequencedTaskExecutor; } +namespace storage { + +namespace spi { + struct PersistenceProvider; + class Context; +} +struct PersistenceUtil; + +class AsyncHandler : public Types { + +public: + AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor); + MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; +private: + static bool tasConditionExists(const api::TestAndSetCommand & cmd); + bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, + spi::Context & context, bool missingDocumentImpliesMatch = false) const; + const PersistenceUtil & _env; + spi::PersistenceProvider & _spi; + vespalib::ISequencedTaskExecutor & _sequencedExecutor; +}; + +} // storage + diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index acbbe782099..ce033bf6e19 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mergehandler.h" +#include "persistenceutil.h" #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> @@ -13,16 +14,16 @@ LOG_SETUP(".persistence.mergehandler"); namespace storage { -MergeHandler::MergeHandler(spi::PersistenceProvider& spi, PersistenceUtil& env) - : _spi(spi), - _env(env), +MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi) + : _env(env), + _spi(spi), _maxChunkSize(env._config.bucketMergeChunkSize) { } -MergeHandler::MergeHandler(spi::PersistenceProvider& spi, PersistenceUtil& env, uint32_t maxChunkSize) - : _spi(spi), - _env(env), +MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, uint32_t maxChunkSize) + : _env(env), + _spi(spi), _maxChunkSize(maxChunkSize) { } diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 7052258ec03..9e200f62307 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -13,14 +13,18 @@ */ #pragma once -#include <vespa/persistence/spi/persistenceprovider.h> +#include "types.h" +#include <vespa/persistence/spi/bucket.h> +#include <vespa/persistence/spi/docentry.h> #include <vespa/storage/persistence/filestorage/mergestatus.h> -#include <vespa/storage/persistence/persistenceutil.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storage/common/messagesender.h> namespace storage { +namespace spi { struct PersistenceProvider; } +struct PersistenceUtil; + class MergeHandler : public Types { public: @@ -30,11 +34,9 @@ public: DELETED_IN_PLACE = 0x04 }; - MergeHandler(spi::PersistenceProvider& spi, PersistenceUtil&); + MergeHandler(PersistenceUtil&, spi::PersistenceProvider& spi); /** Used for unit testing */ - MergeHandler(spi::PersistenceProvider& spi, - PersistenceUtil& env, - uint32_t maxChunkSize); + MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, uint32_t maxChunkSize); bool buildBucketInfoList( const spi::Bucket& bucket, @@ -55,16 +57,16 @@ public: uint8_t nodeIndex, spi::Context& context); - MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, MessageTracker::UP); - MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTracker::UP); + MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP); + MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP); void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&); - MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTracker::UP); + MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP); void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&); private: - spi::PersistenceProvider& _spi; - PersistenceUtil& _env; - uint32_t _maxChunkSize; + PersistenceUtil &_env; + spi::PersistenceProvider &_spi; + uint32_t _maxChunkSize; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 98270561a33..b6ce91c7ddb 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -7,7 +7,6 @@ #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/document/fieldset/fieldsetrepo.h> -#include <vespa/document/update/documentupdate.h> #include <vespa/document/base/exceptions.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> @@ -24,77 +23,9 @@ namespace storage { namespace { -class ResultTask : public vespalib::Executor::Task { -public: - ResultTask() : _result(), _resultHandler(nullptr) { } - void setResult(spi::Result::UP result) { - _result = std::move(result); - } - void addResultHandler(const spi::ResultHandler * resultHandler) { - // Only handles a signal handler now, - // Can be extended if necessary later on - assert(_resultHandler == nullptr); - _resultHandler = resultHandler; - } - void handle(const spi::Result &result ) { - if (_resultHandler != nullptr) { - _resultHandler->handle(result); - } - } -protected: - spi::Result::UP _result; -private: - const spi::ResultHandler * _resultHandler; -}; - -template<class FunctionType> -class LambdaResultTask : public ResultTask { -public: - explicit LambdaResultTask(FunctionType && func) - : _func(std::move(func)) - {} - ~LambdaResultTask() override = default; - void run() override { - handle(*_result); - _func(std::move(_result)); - } -private: - FunctionType _func; -}; - -template <class FunctionType> -std::unique_ptr<ResultTask> -makeResultTask(FunctionType &&function) -{ - return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>> - (std::forward<FunctionType>(function)); -} - -class ResultTaskOperationDone : public spi::OperationComplete { -public: - ResultTaskOperationDone(vespalib::ISequencedTaskExecutor & executor, document::BucketId bucketId, - std::unique_ptr<ResultTask> task) - : _executor(executor), - _task(std::move(task)), - _executorId(executor.getExecutorId(bucketId.getId())) - { - } - void onComplete(spi::Result::UP result) override { - _task->setResult(std::move(result)); - _executor.executeTask(_executorId, std::move(_task)); - } - void addResultHandler(const spi::ResultHandler * resultHandler) override { - _task->addResultHandler(resultHandler); - } -private: - vespalib::ISequencedTaskExecutor & _executor; - std::unique_ptr<ResultTask> _task; - vespalib::ISequencedTaskExecutor::ExecutorId _executorId; -}; - vespalib::string createThreadName(size_t stripeId) { - return vespalib::make_string("Thread %zu", stripeId); + return vespalib::make_string("PersistenceThread-%zu", stripeId); } } @@ -108,10 +39,10 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence : _stripeId(filestorHandler.getNextStripeId()), _component(std::make_unique<ServiceLayerComponent>(compReg, createThreadName(_stripeId))), _env(configUri, *_component, filestorHandler, metrics, provider), - _sequencedExecutor(sequencedExecutor), _spi(provider), _processAllHandler(_env, provider), - _mergeHandler(_spi, _env), + _mergeHandler(_env, _spi), + _asyncHandler(_env, _spi, sequencedExecutor), _bucketOwnershipNotifier() { _bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler); @@ -127,133 +58,6 @@ PersistenceThread::~PersistenceThread() LOG(debug, "Persistence thread done with destruction"); } -spi::Bucket -PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucket) const -{ - BucketId docBucket(_env._bucketFactory.getBucketId(id)); - docBucket.setUsedBits(bucket.getBucketId().getUsedBits()); - if (bucket.getBucketId() != docBucket) { - docBucket = _env._bucketFactory.getBucketId(id); - throw vespalib::IllegalStateException("Document " + id.toString() - + " (bucket " + docBucket.toString() + ") does not belong in " - + "bucket " + bucket.getBucketId().toString() + ".", VESPA_STRLOC); - } - - return spi::Bucket(bucket); -} - -bool -PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) { - return cmd.getCondition().isPresent(); -} - -bool -PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, - spi::Context & context, bool missingDocumentImpliesMatch) { - try { - TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch); - - auto code = helper.retrieveAndMatch(context); - if (code.failed()) { - tracker.fail(code.getResult(), code.getMessage()); - return false; - } - } catch (const TestAndSetException & e) { - auto code = e.getCode(); - tracker.fail(code.getResult(), code.getMessage()); - return false; - } - - return true; -} - -MessageTracker::UP -PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) -{ - MessageTracker & tracker = *trackerUP; - auto& metrics = _env._metrics.put[cmd.getLoadType()]; - tracker.setMetric(metrics); - metrics.request_size.addValue(cmd.getApproxByteSize()); - - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) { - // Will also count condition parse failures etc as TaS failures, but - // those results _will_ increase the error metrics as well. - metrics.test_and_set_failed.inc(); - return trackerUP; - } - - spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); - auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { - tracker->checkForError(*response); - tracker->sendReply(); - }); - _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), - std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); - - return trackerUP; -} - -MessageTracker::UP -PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP) -{ - MessageTracker & tracker = *trackerUP; - auto& metrics = _env._metrics.remove[cmd.getLoadType()]; - tracker.setMetric(metrics); - metrics.request_size.addValue(cmd.getApproxByteSize()); - - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) { - metrics.test_and_set_failed.inc(); - return trackerUP; - } - - spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); - - // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker - auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { - auto & response = dynamic_cast<const spi::RemoveResult &>(*responseUP); - if (tracker->checkForError(response)) { - tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - } - if (!response.wasFound()) { - metrics.notFound.inc(); - } - tracker->sendReply(); - }); - _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), - std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); - return trackerUP; -} - -MessageTracker::UP -PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) -{ - MessageTracker & tracker = *trackerUP; - auto& metrics = _env._metrics.update[cmd.getLoadType()]; - tracker.setMetric(metrics); - metrics.request_size.addValue(cmd.getApproxByteSize()); - - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) { - metrics.test_and_set_failed.inc(); - return trackerUP; - } - - spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); - - // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker - auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { - auto & response = dynamic_cast<const spi::UpdateResult &>(*responseUP); - if (tracker->checkForError(response)) { - auto reply = std::make_shared<api::UpdateReply>(cmd); - reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(std::move(reply)); - } - tracker->sendReply(); - }); - _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), - std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); - return trackerUP; -} - namespace { spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept { @@ -291,8 +95,8 @@ PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker) if ( ! fieldSet) { return tracker; } tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); - spi::GetResult result = - _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), tracker->context()); + spi::GetResult result = _spi.get(_env.getBucket(cmd.getDocumentId(), cmd.getBucket()), + *fieldSet, cmd.getDocumentId(), tracker->context()); if (tracker->checkForError(result)) { if (!result.hasDocument() && (document::FieldSet::Type::NONE != fieldSet->getType())) { @@ -801,11 +605,11 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case api::MessageType::GET_ID: return handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker)); case api::MessageType::PUT_ID: - return handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker)); + return _asyncHandler.handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker)); case api::MessageType::REMOVE_ID: - return handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker)); case api::MessageType::UPDATE_ID: - return handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker)); case api::MessageType::REVERT_ID: return handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 55e1fdc9ae6..2a3ff813c7e 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -5,6 +5,7 @@ #include "diskthread.h" #include "processallhandler.h" #include "mergehandler.h" +#include "asynchandler.h" #include "persistenceutil.h" #include "provider_error_wrapper.h" #include <vespa/storage/common/bucketmessages.h> @@ -15,7 +16,6 @@ namespace storage { class BucketOwnershipNotifier; -class TestAndSetHelper; class PersistenceThread final : public DiskThread, public Types { @@ -29,9 +29,6 @@ public: void flush() override; framework::Thread& getThread() override { return *_thread; } - MessageTracker::UP handlePut(api::PutCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker); @@ -46,14 +43,15 @@ public: MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker); + const AsyncHandler & asyncHandler() const { return _asyncHandler; } private: uint32_t _stripeId; ServiceLayerComponent::UP _component; PersistenceUtil _env; - vespalib::ISequencedTaskExecutor & _sequencedExecutor; spi::PersistenceProvider& _spi; ProcessAllHandler _processAllHandler; MergeHandler _mergeHandler; + AsyncHandler _asyncHandler; framework::Thread::UP _thread; std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier; @@ -75,12 +73,6 @@ private: // Thread main loop void run(framework::ThreadHandle&) override; - spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const; - - friend class TestAndSetHelper; - static bool tasConditionExists(const api::TestAndSetCommand & cmd); - bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, - spi::Context & context, bool missingDocumentImpliesMatch = false); }; } // storage diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 083b15abe13..1f1eb680b91 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -3,6 +3,7 @@ #include "persistenceutil.h" #include <vespa/config/config.h> #include <vespa/config/helper/configgetter.hpp> +#include <vespa/vespalib/util/exceptions.h> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.util"); @@ -285,4 +286,19 @@ PersistenceUtil::shutdown(const std::string& reason) _component.requestShutdown(reason); } +spi::Bucket +PersistenceUtil::getBucket(const document::DocumentId& id, const document::Bucket &bucket) const +{ + document::BucketId docBucket(_bucketFactory.getBucketId(id)); + docBucket.setUsedBits(bucket.getBucketId().getUsedBits()); + if (bucket.getBucketId() != docBucket) { + docBucket = _bucketFactory.getBucketId(id); + throw vespalib::IllegalStateException("Document " + id.toString() + + " (bucket " + docBucket.toString() + ") does not belong in " + + "bucket " + bucket.getBucketId().toString() + ".", VESPA_STRLOC); + } + + return spi::Bucket(bucket); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index f2da4256ea0..bceaf544a9e 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -113,11 +113,11 @@ struct PersistenceUtil { ~PersistenceUtil(); - StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) - { return _component.getBucketDatabase(bucketSpace); } + StorBucketDatabase& getBucketDatabase(document::BucketSpace bucketSpace) { + return _component.getBucketDatabase(bucketSpace); + } - void updateBucketDatabase(const document::Bucket &bucket, - const api::BucketInfo& info); + void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info); uint16_t getPreferredAvailableDisk(const document::Bucket &bucket) const; @@ -139,6 +139,8 @@ struct PersistenceUtil { void setBucketInfo(MessageTracker& tracker, const document::Bucket &bucket); + spi::Bucket getBucket(const document::DocumentId& id, const document::Bucket &bucket) const; + static uint32_t convertErrorCode(const spi::Result& response); void shutdown(const std::string& reason); diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index 15f7c1fffb7..0a1141a9ab3 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -2,6 +2,9 @@ #include "processallhandler.h" #include "bucketprocessor.h" +#include "persistenceutil.h" +#include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/log/log.h> @@ -9,8 +12,7 @@ LOG_SETUP(".persistence.processall"); namespace storage { -ProcessAllHandler::ProcessAllHandler(PersistenceUtil& env, - spi::PersistenceProvider& spi) +ProcessAllHandler::ProcessAllHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi) : _env(env), _spi(spi) { @@ -36,17 +38,9 @@ public: {} void process(spi::DocEntry& entry) override { - spi::RemoveResult removeResult = _provider.remove( - _bucket, - entry.getTimestamp(), - *entry.getDocumentId(), - _context); - + spi::RemoveResult removeResult = _provider.remove(_bucket, entry.getTimestamp(), *entry.getDocumentId(),_context); if (removeResult.getErrorCode() != spi::Result::ErrorType::NONE) { - std::ostringstream ss; - ss << "Failed to do remove for removelocation: " - << removeResult.getErrorMessage(); - throw std::runtime_error(ss.str()); + throw std::runtime_error(vespalib::make_string("Failed to do remove for removelocation: %s", removeResult.getErrorMessage().c_str())); } ++_n_removed; } diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h index 87c3c63b8fe..9c0f8905744 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.h +++ b/storage/src/vespa/storage/persistence/processallhandler.h @@ -1,26 +1,23 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/storage/persistence/messages.h> -#include <vespa/storage/persistence/persistenceutil.h> -#include <vespa/storageapi/message/persistence.h> +#include "types.h" +#include "messages.h" #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> -#include <vespa/persistence/spi/persistenceprovider.h> - -namespace document::select { class Node; } namespace storage { -class ProcessAllHandler : public Types { +namespace spi { struct PersistenceProvider; } +struct PersistenceUtil; +class ProcessAllHandler : public Types { public: - ProcessAllHandler(PersistenceUtil&, spi::PersistenceProvider&); - MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, MessageTracker::UP tracker); - MessageTracker::UP handleStatBucket(api::StatBucketCommand&, MessageTracker::UP tracker); - -protected: - PersistenceUtil& _env; + ProcessAllHandler(const PersistenceUtil&, spi::PersistenceProvider&); + MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker); + MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker); +private: + const PersistenceUtil& _env; spi::PersistenceProvider& _spi; }; diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp index 523f5a52885..f222325053c 100644 --- a/storage/src/vespa/storage/persistence/testandsethelper.cpp +++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp @@ -1,8 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. // @author Vegard Sjonfjell -#include "fieldvisitor.h" #include "testandsethelper.h" +#include "persistenceutil.h" +#include "fieldvisitor.h" #include <vespa/document/select/parser.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/vespalib/util/stringfmt.h> @@ -24,7 +25,7 @@ void TestAndSetHelper::resolveDocumentType(const document::DocumentTypeRepo & do } void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo) { - document::select::Parser parser(documentTypeRepo, _component.getBucketIdFactory()); + document::select::Parser parser(documentTypeRepo, _env._bucketFactory); try { _docSelectionUp = parser.parse(_cmd.getCondition().getSelection()); @@ -34,25 +35,20 @@ void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo & } spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context) { - return _thread._spi.get( - _thread.getBucket(_docId, _cmd.getBucket()), - fieldSet, - _cmd.getDocumentId(), - context); + return _spi.get(_env.getBucket(_docId, _cmd.getBucket()), fieldSet,_cmd.getDocumentId(),context); } -TestAndSetHelper::TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd, - bool missingDocumentImpliesMatch) - : _thread(thread), - _component(thread._env._component), +TestAndSetHelper::TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & spi, + const api::TestAndSetCommand & cmd, bool missingDocumentImpliesMatch) + : _env(env), + _spi(spi), _cmd(cmd), _docId(cmd.getDocumentId()), _docTypePtr(_cmd.getDocumentType()), _missingDocumentImpliesMatch(missingDocumentImpliesMatch) { - auto docTypeRepo = _component.getTypeRepo()->documentTypeRepo; - resolveDocumentType(*docTypeRepo); - parseDocumentSelection(*docTypeRepo); + resolveDocumentType(*env._repo); + parseDocumentSelection(*env._repo); } TestAndSetHelper::~TestAndSetHelper() = default; @@ -72,7 +68,7 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) { if (_docSelectionUp->contains(*docPtr) != document::select::Result::True) { return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, vespalib::make_string("Condition did not match document nodeIndex=%d bucket=%" PRIx64 " %s", - _thread._env._nodeIndex, _cmd.getBucketId().getRawId(), + _env._nodeIndex, _cmd.getBucketId().getRawId(), _cmd.hasBeenRemapped() ? "remapped" : "")); } @@ -84,7 +80,7 @@ TestAndSetHelper::retrieveAndMatch(spi::Context & context) { return api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, vespalib::make_string("Document does not exist nodeIndex=%d bucket=%" PRIx64 " %s", - _thread._env._nodeIndex, _cmd.getBucketId().getRawId(), + _env._nodeIndex, _cmd.getBucketId().getRawId(), _cmd.hasBeenRemapped() ? "remapped" : "")); } diff --git a/storage/src/vespa/storage/persistence/testandsethelper.h b/storage/src/vespa/storage/persistence/testandsethelper.h index f2c55d5ba11..ca4e452c621 100644 --- a/storage/src/vespa/storage/persistence/testandsethelper.h +++ b/storage/src/vespa/storage/persistence/testandsethelper.h @@ -2,10 +2,23 @@ // @author Vegard Sjonfjell #pragma once -#include <vespa/storage/persistence/persistencethread.h> + +#include <vespa/storageapi/message/persistence.h> +#include <vespa/persistence/spi/result.h> + +namespace document::select { class Node; } +namespace document { class FieldSet; } namespace storage { +namespace spi { + class Context; + class PersistenceProvider; +} +class PersistenceThread; +class ServiceLayerComponent; +class PersistenceUtil; + class TestAndSetException : public std::runtime_error { api::ReturnCode _code; @@ -19,22 +32,21 @@ public: }; class TestAndSetHelper { - PersistenceThread & _thread; - ServiceLayerComponent & _component; - const api::TestAndSetCommand & _cmd; - - const document::DocumentId _docId; - const document::DocumentType * _docTypePtr; + const PersistenceUtil &_env; + const spi::PersistenceProvider &_spi; + const api::TestAndSetCommand &_cmd; + const document::DocumentId _docId; + const document::DocumentType * _docTypePtr; std::unique_ptr<document::select::Node> _docSelectionUp; - bool _missingDocumentImpliesMatch; + bool _missingDocumentImpliesMatch; void resolveDocumentType(const document::DocumentTypeRepo & documentTypeRepo); void parseDocumentSelection(const document::DocumentTypeRepo & documentTypeRepo); spi::GetResult retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context); public: - TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd, - bool missingDocumentImpliesMatch = false); + TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & _spi, + const api::TestAndSetCommand & cmd, bool missingDocumentImpliesMatch = false); ~TestAndSetHelper(); api::ReturnCode retrieveAndMatch(spi::Context & context); }; diff --git a/storage/src/vespa/storage/persistence/types.h b/storage/src/vespa/storage/persistence/types.h index 1616395dd32..6a73adc8b0c 100644 --- a/storage/src/vespa/storage/persistence/types.h +++ b/storage/src/vespa/storage/persistence/types.h @@ -14,6 +14,8 @@ namespace storage { +class MessageTracker; + struct Types { typedef document::BucketId BucketId; typedef document::Document Document; @@ -25,6 +27,7 @@ struct Types { typedef api::BucketInfo BucketInfo; typedef api::ReturnCode ReturnCode; typedef StorBucketDatabase::WrappedEntry BucketDBEntry; + using MessageTrackerUP = std::unique_ptr<MessageTracker>; static const framework::MicroSecTime MAX_TIMESTAMP; static const framework::MicroSecTime UNSET_TIMESTAMP; |