summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp46
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h4
6 files changed, 40 insertions, 26 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 40b0d8eb2ba..8cf9a7ac661 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -154,6 +154,15 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
PersistenceProviderWrapper& providerWrapper,
HandlerInvoker& invoker,
const ExpectedExceptionSpec& spec);
+
+ MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
+ return MergeHandler(getEnv(), getPersistenceProvider(),
+ getEnv()._component.getClusterName(), getEnv()._component.getClock(), maxChunkSize);
+ }
+ MergeHandler createHandler(spi::PersistenceProvider & spi) {
+ return MergeHandler(getEnv(), spi,
+ getEnv()._component.getClusterName(), getEnv()._component.getClock());
+ }
};
MergeHandlerTest::HandleGetBucketDiffReplyInvoker::HandleGetBucketDiffReplyInvoker() = default;
@@ -199,7 +208,8 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
TEST_F(MergeHandlerTest, merge_bucket_command) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler(getEnv(), getPersistenceProvider(),
+ getEnv()._component.getClusterName(), getEnv()._component.getClock());
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -224,7 +234,7 @@ void
MergeHandlerTest::testGetBucketDiffChain(bool midChain)
{
setUpChain(midChain ? MIDDLE : BACK);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
LOG(debug, "Verifying that get bucket diff is sent on");
auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
@@ -273,7 +283,7 @@ void
MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
{
setUpChain(midChain ? MIDDLE : BACK);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
LOG(debug, "Verifying that apply bucket diff is sent on");
auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
@@ -320,7 +330,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
TEST_F(MergeHandlerTest, master_message_flow) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -421,7 +431,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
doPut(1234, spi::Timestamp(4000 + i), docSize, docSize);
}
- MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize);
+ MergeHandler handler = createHandler(maxChunkSize);
LOG(debug, "Handle a merge bucket command");
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -504,7 +514,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize);
applyBucketDiffCmd->getDiff() = applyDiff;
- MergeHandler handler(getEnv(), getPersistenceProvider(), maxChunkSize);
+ MergeHandler handler = createHandler(maxChunkSize);
handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
@@ -516,7 +526,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
TEST_F(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -624,7 +634,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset,
TEST_F(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -644,7 +654,7 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
}
TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -652,7 +662,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
}
TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -675,7 +685,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
}
TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
_nodes.emplace_back(1, false);
@@ -707,7 +717,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
}
TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
api::ApplyBucketDiffCommand::Entry e;
@@ -815,7 +825,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -847,7 +857,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -880,7 +890,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
@@ -945,7 +955,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
HandleGetBucketDiffReplyInvoker invoker;
@@ -1036,7 +1046,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
ChainPos pos(i == 0 ? FRONT : MIDDLE);
setUpChain(pos);
invoker.setChainPos(pos);
- MergeHandler handler(getEnv(), providerWrapper);
+ MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -1128,7 +1138,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
spi::Timestamp ts(10111);
doPut(doc, ts);
- MergeHandler handler(getEnv(), getPersistenceProvider());
+ MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
api::ApplyBucketDiffCommand::Entry e;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index ec71aee7eed..16a23b5f5a7 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -14,11 +14,13 @@ LOG_SETUP(".persistence.mergehandler");
namespace storage {
-MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, uint32_t maxChunkSize,
+MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
+ const vespalib::string & clusterName, const framework::Clock & clock,
+ uint32_t maxChunkSize,
bool enableMergeLocalNodeChooseDocsOptimalization,
uint32_t commonMergeChainOptimalizationMinimumSize)
- : _clock(env._component.getClock()),
- _clusterName(env._component.getClusterName()),
+ : _clock(clock),
+ _clusterName(clusterName),
_env(env),
_spi(spi),
_maxChunkSize(maxChunkSize),
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index af2f765aed5..830fb20c8d9 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -35,6 +35,7 @@ public:
};
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
+ const vespalib::string & clusterName, const framework::Clock & clock,
uint32_t maxChunkSize = 4190208,
bool enableMergeLocalNodeChooseDocsOptimalization = true,
uint32_t commonMergeChainOptimalizationMinimumSize = 64);
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 239a133fd02..d4508b406ce 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -17,7 +17,8 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
: _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, cfg.bucketMergeChunkSize,
+ _mergeHandler(_env, provider, component.getClusterName(), _clock,
+ cfg.bucketMergeChunkSize,
cfg.enableMergeLocalNodeChooseDocsOptimalization,
cfg.commonMergeChainOptimalizationMinimumSize),
_asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()),
diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h
index 14b6bced8a7..44a3e631e0b 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.h
+++ b/storage/src/vespa/storage/persistence/processallhandler.h
@@ -17,8 +17,8 @@ public:
MessageTrackerUP handleRemoveLocation(api::RemoveLocationCommand&, MessageTrackerUP tracker) const;
MessageTrackerUP handleStatBucket(api::StatBucketCommand&, MessageTrackerUP tracker) const;
private:
- const PersistenceUtil& _env;
- spi::PersistenceProvider& _spi;
+ const PersistenceUtil & _env;
+ spi::PersistenceProvider & _spi;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index f7c72859f78..79828679731 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -30,8 +30,8 @@ public:
MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- const PersistenceUtil& _env;
- spi::PersistenceProvider& _spi;
+ const PersistenceUtil & _env;
+ spi::PersistenceProvider & _spi;
};
} // storage