diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-01-18 11:27:37 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-01-18 11:27:37 +0100 |
commit | 93a5928d69b92db5d90f721d8eaa7bda26b5a805 (patch) | |
tree | 99b3f765fc7ce87ecc977c59b509472d87565905 /storage | |
parent | 9de6b0dc0fcc736112b17a35a3de053bd4039913 (diff) |
Remove sync apply bucket diff.
Diffstat (limited to 'storage')
6 files changed, 37 insertions, 84 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 6288f86993d..e9d399d357f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -21,11 +21,9 @@ using namespace ::testing; namespace storage { /* - * Class for testing merge handler taking async_apply_bucket_diff as - * parameter for the test. + * Class for testing merge handler. */ -struct MergeHandlerTest : PersistenceTestUtils, - public testing::WithParamInterface<bool> { +struct MergeHandlerTest : PersistenceTestUtils { uint32_t _location; // Location used for all merge tests document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; @@ -172,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils, MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64); } std::shared_ptr<api::StorageMessage> get_queued_reply() { @@ -232,7 +230,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. -TEST_P(MergeHandlerTest, merge_bucket_command) { +TEST_F(MergeHandlerTest, merge_bucket_command) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -293,11 +291,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) EXPECT_EQ(17, diff.size()); } -TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) { +TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) { testGetBucketDiffChain(true); } -TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) { +TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) { testGetBucketDiffChain(false); } @@ -344,17 +342,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_EQ(0, diff.size()); } -TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) { +TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) { testApplyBucketDiffChain(true); } -TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) { +TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { testApplyBucketDiffChain(false); } // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. -TEST_P(MergeHandlerTest, master_message_flow) { +TEST_F(MergeHandlerTest, master_message_flow) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -448,7 +446,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) } -TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { +TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { uint32_t docSize = 1024; uint32_t docCount = 10; uint32_t maxChunkSize = docSize * 3; @@ -512,7 +510,7 @@ TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { EXPECT_TRUE(reply->getResult().success()); } -TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { +TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { setUpChain(FRONT); uint32_t docSize = 1024; @@ -548,7 +546,7 @@ TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize); } -TEST_P(MergeHandlerTest, max_timestamp) { +TEST_F(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); MergeHandler handler = createHandler(); @@ -656,7 +654,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask return getBucketDiffCmd; } -TEST_P(MergeHandlerTest, spi_flush_guard) { +TEST_F(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); @@ -671,16 +669,14 @@ TEST_P(MergeHandlerTest, spi_flush_guard) { try { auto cmd = createDummyApplyDiff(6000); handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); - if (GetParam()) { - convert_delayed_error_to_exception(handler); - } + convert_delayed_error_to_exception(handler); FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } } -TEST_P(MergeHandlerTest, bucket_not_found_in_db) { +TEST_F(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler = createHandler(); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); @@ -688,7 +684,7 @@ TEST_P(MergeHandlerTest, bucket_not_found_in_db) { EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } -TEST_P(MergeHandlerTest, merge_progress_safe_guard) { +TEST_F(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -711,7 +707,7 @@ TEST_P(MergeHandlerTest, merge_progress_safe_guard) { EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE); } -TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { +TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { MergeHandler handler = createHandler(); _nodes.clear(); _nodes.emplace_back(0, false); @@ -743,7 +739,7 @@ TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask); } -TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) { +TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { @@ -799,9 +795,7 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler, providerWrapper.setFailureMask(failureMask); try { invoker.invoke(*this, handler, *_context); - if (GetParam()) { - convert_delayed_error_to_exception(handler); - } + convert_delayed_error_to_exception(handler); if (failureMask != 0) { return (std::string("No exception was thrown during handler " "invocation. Expected exception containing '") @@ -870,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { +TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -901,7 +895,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { +TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -933,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) { +TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -998,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( api::ReturnCode::INTERNAL_FAILURE); } -TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { +TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -1073,9 +1067,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); - if (test.GetParam()) { - convert_delayed_error_to_exception(test, handler); - } + convert_delayed_error_to_exception(test, handler); } std::string @@ -1099,7 +1091,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke( } } -TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { +TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); HandleApplyBucketDiffReplyInvoker invoker; for (int i = 0; i < 2; ++i) { @@ -1126,7 +1118,7 @@ TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { } } -TEST_P(MergeHandlerTest, remove_from_diff) { +TEST_F(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; MergeStatus status(clock, 0, 0); @@ -1192,7 +1184,7 @@ TEST_P(MergeHandlerTest, remove_from_diff) { } } -TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { +TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; @@ -1216,15 +1208,10 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); - if (GetParam()) { - ASSERT_FALSE(tracker); - handler.drain_async_writes(); - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); - ASSERT_TRUE(applyBucketDiffReply.get()); - } else { - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); - ASSERT_TRUE(applyBucketDiffReply.get()); - } + ASSERT_FALSE(tracker); + handler.drain_async_writes(); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); + ASSERT_TRUE(applyBucketDiffReply.get()); tracker.reset(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -1314,7 +1301,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en } -TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { using NodeList = decltype(_nodes); // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 @@ -1446,14 +1433,10 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } - if (GetParam()) { - handler.drain_async_writes(); - } + handler.drain_async_writes(); ASSERT_EQ(6u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); } -VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true)); - } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index f5b9da0e1f5..4a215c7a348 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -226,11 +226,6 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) *_filestorHandler, i % numStripes, _component)); } _bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this)); - } else { - std::lock_guard guard(_lock); - for (auto& handler : _persistenceHandlers) { - handler->configure(*config); - } } } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7dcf4bcbee2..4a5362f3d8d 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -27,8 +27,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize, - bool async_apply_bucket_diff) + uint32_t commonMergeChainOptimalizationMinimumSize) : _clock(clock), _cluster_context(cluster_context), _env(env), @@ -37,7 +36,6 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _async_apply_bucket_diff(async_apply_bucket_diff), _executor(executor) { } @@ -1281,9 +1279,6 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); - if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { - check_apply_diff_sync(std::move(async_results)); - } } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1388,9 +1383,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa if (applyDiffHasLocallyNeededData(diff, index)) { async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); - if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { - check_apply_diff_sync(std::move(async_results)); - } } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), @@ -1481,12 +1473,6 @@ MergeHandler::drain_async_writes() } void -MergeHandler::configure(bool async_apply_bucket_diff) noexcept -{ - _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release); -} - -void MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const { auto bucket_id = state->get_bucket().getBucketId(); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 1007f35c241..e1c821aab48 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -50,8 +50,7 @@ public: const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize = 4190208, - uint32_t commonMergeChainOptimalizationMinimumSize = 64, - bool async_apply_bucket_diff = false); + uint32_t commonMergeChainOptimalizationMinimumSize = 64); ~MergeHandler() override; @@ -79,7 +78,6 @@ public: MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); - void configure(bool async_apply_bucket_diff) noexcept; private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; @@ -91,7 +89,6 @@ private: std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; - std::atomic<bool> _async_apply_bucket_diff; vespalib::ISequencedTaskExecutor& _executor; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index c0c95ffd7af..569da873cea 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -19,8 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen _processAllHandler(_env, provider), _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize, - cfg.commonMergeChainOptimalizationMinimumSize, - cfg.asyncApplyBucketDiff), + cfg.commonMergeChainOptimalizationMinimumSize), _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider) @@ -171,10 +170,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } -void -PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept -{ - _mergeHandler.configure(config.asyncApplyBucketDiff); -} - } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index c60fb05e56e..a92c2dc78ca 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -35,7 +35,6 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } - void configure(vespa::config::content::StorFilestorConfig& config) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; |