diff options
11 files changed, 38 insertions, 102 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 1aaa7f71f8e..2b1105803e2 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -43,10 +43,6 @@ common_merge_chain_optimalization_minimum_size int default=64 restart ## Note that this will gradually be increased to reach stor-distributormanager:splitsize which is currently at 32M bucket_merge_chunk_size int default=33554432 restart -## If set, portions of apply bucket diff handling will be performed asynchronously -## with persistence thread not waiting for local writes to complete. -async_apply_bucket_diff bool default=true - ## When merging, it is possible to send more metadata than needed in order to ## let local nodes in merge decide which entries fits best to add this time ## based on disk location. Toggle this option on to use it. Note that memory diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp index c1dbe9b2bd2..beb1e0bd6bc 100644 --- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -465,7 +465,6 @@ App::usage() "USAGE:\n"; std::cerr << "vespa-redistribute-bm\n" - "[--async-apply-bucket-diff]\n" "[--bucket-db-stripe-bits bits]\n" "[--client-threads threads]\n" "[--distributor-merge-busy-wait distributor-merge-busy-wait]\n" @@ -502,7 +501,6 @@ App::get_options() const char *opt_argument = nullptr; int long_opt_index = 0; static struct option long_opts[] = { - { "async-apply-bucket-diff", 0, nullptr, 0 }, { "bucket-db-stripe-bits", 1, nullptr, 0 }, { "client-threads", 1, nullptr, 0 }, { "distributor-merge-busy-wait", 1, nullptr, 0 }, @@ -533,7 +531,6 @@ App::get_options() { nullptr, 0, nullptr, 0 } }; enum longopts_enum { - LONGOPT_ASYNC_APPLY_BUCKET_DIFF, LONGOPT_BUCKET_DB_STRIPE_BITS, LONGOPT_CLIENT_THREADS, LONGOPT_DISTRIBUTOR_MERGE_BUSY_WAIT, @@ -568,9 +565,6 @@ App::get_options() switch (c) { case 0: switch(long_opt_index) { - case LONGOPT_ASYNC_APPLY_BUCKET_DIFF: - _bm_params.set_async_apply_bucket_diff(true); - break; case LONGOPT_BUCKET_DB_STRIPE_BITS: _bm_params.set_bucket_db_stripe_bits(atoi(opt_argument)); break; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp index 4a3466f1a51..3ff10b19164 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp @@ -6,8 +6,7 @@ namespace search::bmcluster { BmClusterParams::BmClusterParams() - : _async_apply_bucket_diff(), - _bucket_db_stripe_bits(4), + : _bucket_db_stripe_bits(4), _disable_queue_limits_for_chained_merges(false), // Same default as in stor-server.def _distributor_merge_busy_wait(10), // Same default as stor_distributormanager.def _distributor_stripes(0), diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h index 36b4c22f6a8..d365a28b0b6 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h @@ -13,7 +13,6 @@ namespace search::bmcluster { */ class BmClusterParams { - std::optional<bool> _async_apply_bucket_diff; uint32_t _bucket_db_stripe_bits; bool _disable_queue_limits_for_chained_merges; uint32_t _distributor_merge_busy_wait; @@ -45,7 +44,6 @@ class BmClusterParams public: BmClusterParams(); ~BmClusterParams(); - const std::optional<bool>& get_async_apply_bucket_diff() const noexcept { return _async_apply_bucket_diff; } uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } bool get_disable_queue_limits_for_chained_merges() const noexcept { return _disable_queue_limits_for_chained_merges; } uint32_t get_distributor_merge_busy_wait() const { return _distributor_merge_busy_wait; } @@ -75,7 +73,6 @@ public: bool needs_distributor() const { return _enable_distributor || _use_document_api; } bool needs_message_bus() const { return _use_message_bus || _use_document_api; } bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } - void set_async_apply_bucket_diff(bool value) { _async_apply_bucket_diff = value; } void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } void set_disable_queue_limits_for_chained_merges(bool value) { _disable_queue_limits_for_chained_merges = value; } void set_distributor_merge_busy_wait(uint32_t value) { _distributor_merge_busy_wait = value; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index db2060bacf7..3587e8008f2 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -382,9 +382,6 @@ struct ServiceLayerConfigSet : public StorageConfigSet stor_bucket_init(), stor_visitor() { - if (params.get_async_apply_bucket_diff().has_value()) { - stor_filestor.asyncApplyBucketDiff = params.get_async_apply_bucket_diff().value(); - } stor_filestor.numResponseThreads = params.get_response_threads(); stor_filestor.numNetworkThreads = params.get_rpc_network_threads(); stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule(); diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 6288f86993d..e9d399d357f 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -21,11 +21,9 @@ using namespace ::testing; namespace storage { /* - * Class for testing merge handler taking async_apply_bucket_diff as - * parameter for the test. + * Class for testing merge handler. */ -struct MergeHandlerTest : PersistenceTestUtils, - public testing::WithParamInterface<bool> { +struct MergeHandlerTest : PersistenceTestUtils { uint32_t _location; // Location used for all merge tests document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; @@ -172,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils, MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64); } std::shared_ptr<api::StorageMessage> get_queued_reply() { @@ -232,7 +230,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. -TEST_P(MergeHandlerTest, merge_bucket_command) { +TEST_F(MergeHandlerTest, merge_bucket_command) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -293,11 +291,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) EXPECT_EQ(17, diff.size()); } -TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) { +TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) { testGetBucketDiffChain(true); } -TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) { +TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) { testGetBucketDiffChain(false); } @@ -344,17 +342,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_EQ(0, diff.size()); } -TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) { +TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) { testApplyBucketDiffChain(true); } -TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) { +TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { testApplyBucketDiffChain(false); } // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. -TEST_P(MergeHandlerTest, master_message_flow) { +TEST_F(MergeHandlerTest, master_message_flow) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -448,7 +446,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) } -TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { +TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { uint32_t docSize = 1024; uint32_t docCount = 10; uint32_t maxChunkSize = docSize * 3; @@ -512,7 +510,7 @@ TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { EXPECT_TRUE(reply->getResult().success()); } -TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { +TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { setUpChain(FRONT); uint32_t docSize = 1024; @@ -548,7 +546,7 @@ TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize); } -TEST_P(MergeHandlerTest, max_timestamp) { +TEST_F(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); MergeHandler handler = createHandler(); @@ -656,7 +654,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask return getBucketDiffCmd; } -TEST_P(MergeHandlerTest, spi_flush_guard) { +TEST_F(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); @@ -671,16 +669,14 @@ TEST_P(MergeHandlerTest, spi_flush_guard) { try { auto cmd = createDummyApplyDiff(6000); handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); - if (GetParam()) { - convert_delayed_error_to_exception(handler); - } + convert_delayed_error_to_exception(handler); FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } } -TEST_P(MergeHandlerTest, bucket_not_found_in_db) { +TEST_F(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler = createHandler(); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); @@ -688,7 +684,7 @@ TEST_P(MergeHandlerTest, bucket_not_found_in_db) { EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } -TEST_P(MergeHandlerTest, merge_progress_safe_guard) { +TEST_F(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -711,7 +707,7 @@ TEST_P(MergeHandlerTest, merge_progress_safe_guard) { EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE); } -TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { +TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { MergeHandler handler = createHandler(); _nodes.clear(); _nodes.emplace_back(0, false); @@ -743,7 +739,7 @@ TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask); } -TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) { +TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { @@ -799,9 +795,7 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler, providerWrapper.setFailureMask(failureMask); try { invoker.invoke(*this, handler, *_context); - if (GetParam()) { - convert_delayed_error_to_exception(handler); - } + convert_delayed_error_to_exception(handler); if (failureMask != 0) { return (std::string("No exception was thrown during handler " "invocation. Expected exception containing '") @@ -870,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { +TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -901,7 +895,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { +TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -933,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) { +TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -998,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( api::ReturnCode::INTERNAL_FAILURE); } -TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { +TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -1073,9 +1067,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); - if (test.GetParam()) { - convert_delayed_error_to_exception(test, handler); - } + convert_delayed_error_to_exception(test, handler); } std::string @@ -1099,7 +1091,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke( } } -TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { +TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); HandleApplyBucketDiffReplyInvoker invoker; for (int i = 0; i < 2; ++i) { @@ -1126,7 +1118,7 @@ TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { } } -TEST_P(MergeHandlerTest, remove_from_diff) { +TEST_F(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; MergeStatus status(clock, 0, 0); @@ -1192,7 +1184,7 @@ TEST_P(MergeHandlerTest, remove_from_diff) { } } -TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { +TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; @@ -1216,15 +1208,10 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); - if (GetParam()) { - ASSERT_FALSE(tracker); - handler.drain_async_writes(); - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); - ASSERT_TRUE(applyBucketDiffReply.get()); - } else { - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); - ASSERT_TRUE(applyBucketDiffReply.get()); - } + ASSERT_FALSE(tracker); + handler.drain_async_writes(); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); + ASSERT_TRUE(applyBucketDiffReply.get()); tracker.reset(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -1314,7 +1301,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en } -TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { using NodeList = decltype(_nodes); // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 @@ -1446,14 +1433,10 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } - if (GetParam()) { - handler.drain_async_writes(); - } + handler.drain_async_writes(); ASSERT_EQ(6u, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType()); LOG(debug, "got mergebucket reply"); } -VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true)); - } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index f5b9da0e1f5..4a215c7a348 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -226,11 +226,6 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) *_filestorHandler, i % numStripes, _component)); } _bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this)); - } else { - std::lock_guard guard(_lock); - for (auto& handler : _persistenceHandlers) { - handler->configure(*config); - } } } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7dcf4bcbee2..4a5362f3d8d 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -27,8 +27,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize, - bool async_apply_bucket_diff) + uint32_t commonMergeChainOptimalizationMinimumSize) : _clock(clock), _cluster_context(cluster_context), _env(env), @@ -37,7 +36,6 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _async_apply_bucket_diff(async_apply_bucket_diff), _executor(executor) { } @@ -1281,9 +1279,6 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); - if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { - check_apply_diff_sync(std::move(async_results)); - } } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1388,9 +1383,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa if (applyDiffHasLocallyNeededData(diff, index)) { async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); - if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { - check_apply_diff_sync(std::move(async_results)); - } } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), @@ -1481,12 +1473,6 @@ MergeHandler::drain_async_writes() } void -MergeHandler::configure(bool async_apply_bucket_diff) noexcept -{ - _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release); -} - -void MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const { auto bucket_id = state->get_bucket().getBucketId(); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 1007f35c241..e1c821aab48 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -50,8 +50,7 @@ public: const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, uint32_t maxChunkSize = 4190208, - uint32_t commonMergeChainOptimalizationMinimumSize = 64, - bool async_apply_bucket_diff = false); + uint32_t commonMergeChainOptimalizationMinimumSize = 64); ~MergeHandler() override; @@ -79,7 +78,6 @@ public: MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); - void configure(bool async_apply_bucket_diff) noexcept; private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; @@ -91,7 +89,6 @@ private: std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; - std::atomic<bool> _async_apply_bucket_diff; vespalib::ISequencedTaskExecutor& _executor; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index c0c95ffd7af..569da873cea 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -19,8 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen _processAllHandler(_env, provider), _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize, - cfg.commonMergeChainOptimalizationMinimumSize, - cfg.asyncApplyBucketDiff), + cfg.commonMergeChainOptimalizationMinimumSize), _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider) @@ -171,10 +170,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } -void -PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept -{ - _mergeHandler.configure(config.asyncApplyBucketDiff); -} - } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index c60fb05e56e..a92c2dc78ca 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -35,7 +35,6 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } - void configure(vespa::config::content::StorFilestorConfig& config) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; |