diff options
6 files changed, 40 insertions, 26 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 40b0d8eb2ba..8cf9a7ac661 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -154,6 +154,15 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { PersistenceProviderWrapper& providerWrapper, HandlerInvoker& invoker, const ExpectedExceptionSpec& spec); + + MergeHandler createHandler(size_t maxChunkSize = 0x400000) { + return MergeHandler(getEnv(), getPersistenceProvider(), + getEnv()._component.getClusterName(), getEnv()._component.getClock(), maxChunkSize); + } + MergeHandler createHandler(spi::PersistenceProvider & spi) { + return MergeHandler(getEnv(), spi, + getEnv()._component.getClusterName(), getEnv()._component.getClock()); + } }; MergeHandlerTest::HandleGetBucketDiffReplyInvoker::HandleGetBucketDiffReplyInvoker() = default; @@ -199,7 +208,8 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. TEST_F(MergeHandlerTest, merge_bucket_command) { - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler(getEnv(), getPersistenceProvider(), + getEnv()._component.getClusterName(), getEnv()._component.getClock()); LOG(debug, "Handle a merge bucket command"); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -224,7 +234,7 @@ void MergeHandlerTest::testGetBucketDiffChain(bool midChain) { setUpChain(midChain ? MIDDLE : BACK); - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); LOG(debug, "Verifying that get bucket diff is sent on"); auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); @@ -273,7 +283,7 @@ void MergeHandlerTest::testApplyBucketDiffChain(bool midChain) { setUpChain(midChain ? MIDDLE : BACK); - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); LOG(debug, "Verifying that apply bucket diff is sent on"); auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); @@ -320,7 +330,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. TEST_F(MergeHandlerTest, master_message_flow) { - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -421,7 +431,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { doPut(1234, spi::Timestamp(4000 + i), docSize, docSize); } - MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize); + MergeHandler handler = createHandler(maxChunkSize); LOG(debug, "Handle a merge bucket command"); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); @@ -504,7 +514,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize); applyBucketDiffCmd->getDiff() = applyDiff; - MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize); + MergeHandler handler = createHandler(maxChunkSize); handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); @@ -516,7 +526,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { TEST_F(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -624,7 +634,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, TEST_F(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(getEnv(), providerWrapper); + MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -644,7 +654,7 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { } TEST_F(MergeHandlerTest, bucket_not_found_in_db) { - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -652,7 +662,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) { } TEST_F(MergeHandlerTest, merge_progress_safe_guard) { - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -675,7 +685,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { } TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); _nodes.clear(); _nodes.emplace_back(0, false); _nodes.emplace_back(1, false); @@ -707,7 +717,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { } TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { api::ApplyBucketDiffCommand::Entry e; @@ -815,7 +825,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(getEnv(), providerWrapper); + MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); @@ -847,7 +857,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(getEnv(), providerWrapper); + MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); @@ -880,7 +890,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(getEnv(), providerWrapper); + MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); @@ -945,7 +955,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); - MergeHandler handler(getEnv(), providerWrapper); + MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); HandleGetBucketDiffReplyInvoker invoker; @@ -1036,7 +1046,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { ChainPos pos(i == 0 ? FRONT : MIDDLE); setUpChain(pos); invoker.setChainPos(pos); - MergeHandler handler(getEnv(), providerWrapper); + MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -1128,7 +1138,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { spi::Timestamp ts(10111); doPut(doc, ts); - MergeHandler handler(getEnv(), getPersistenceProvider()); + MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { api::ApplyBucketDiffCommand::Entry e; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index ec71aee7eed..16a23b5f5a7 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -14,11 +14,13 @@ LOG_SETUP(".persistence.mergehandler"); namespace storage { -MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, uint32_t maxChunkSize, +MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, + const vespalib::string & clusterName, const framework::Clock & clock, + uint32_t maxChunkSize, bool enableMergeLocalNodeChooseDocsOptimalization, uint32_t commonMergeChainOptimalizationMinimumSize) - : _clock(env._component.getClock()), - _clusterName(env._component.getClusterName()), + : _clock(clock), + _clusterName(clusterName), _env(env), _spi(spi), _maxChunkSize(maxChunkSize), diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index af2f765aed5..830fb20c8d9 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -35,6 +35,7 @@ public: }; MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, + const vespalib::string & clusterName, const framework::Clock & clock, uint32_t maxChunkSize = 4190208, bool enableMergeLocalNodeChooseDocsOptimalization = true, uint32_t commonMergeChainOptimalizationMinimumSize = 64); diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 239a133fd02..d4508b406ce 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -17,7 +17,8 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, cfg.bucketMergeChunkSize, + _mergeHandler(_env, provider, component.getClusterName(), _clock, + cfg.bucketMergeChunkSize, cfg.enableMergeLocalNodeChooseDocsOptimalization, cfg.commonMergeChainOptimalizationMinimumSize), _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()), diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h index 14b6bced8a7..44a3e631e0b 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.h +++ b/storage/src/vespa/storage/persistence/processallhandler.h @@ -17,8 +17,8 @@ public: MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker) const; MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker) const; private: - const PersistenceUtil& _env; - spi::PersistenceProvider& _spi; + const PersistenceUtil & _env; + spi::PersistenceProvider & _spi; }; } // storage diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index f7c72859f78..79828679731 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -30,8 +30,8 @@ public: MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const; private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; - const PersistenceUtil& _env; - spi::PersistenceProvider& _spi; + const PersistenceUtil & _env; + spi::PersistenceProvider & _spi; }; } // storage |