summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-18 11:27:37 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-18 11:27:37 +0100
commit93a5928d69b92db5d90f721d8eaa7bda26b5a805 (patch)
tree99b3f765fc7ce87ecc977c59b509472d87565905 /storage
parent9de6b0dc0fcc736112b17a35a3de053bd4039913 (diff)
Remove sync apply bucket diff.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp85
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h1
6 files changed, 37 insertions, 84 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 6288f86993d..e9d399d357f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -21,11 +21,9 @@ using namespace ::testing;
namespace storage {
/*
- * Class for testing merge handler taking async_apply_bucket_diff as
- * parameter for the test.
+ * Class for testing merge handler.
*/
-struct MergeHandlerTest : PersistenceTestUtils,
- public testing::WithParamInterface<bool> {
+struct MergeHandlerTest : PersistenceTestUtils {
uint32_t _location; // Location used for all merge tests
document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
@@ -172,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils,
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64);
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64);
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
@@ -232,7 +230,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
-TEST_P(MergeHandlerTest, merge_bucket_command) {
+TEST_F(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -293,11 +291,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
EXPECT_EQ(17, diff.size());
}
-TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) {
+TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) {
testGetBucketDiffChain(true);
}
-TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) {
+TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) {
testGetBucketDiffChain(false);
}
@@ -344,17 +342,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_EQ(0, diff.size());
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) {
testApplyBucketDiffChain(true);
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
testApplyBucketDiffChain(false);
}
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
-TEST_P(MergeHandlerTest, master_message_flow) {
+TEST_F(MergeHandlerTest, master_message_flow) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -448,7 +446,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
}
-TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
+TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
uint32_t docSize = 1024;
uint32_t docCount = 10;
uint32_t maxChunkSize = docSize * 3;
@@ -512,7 +510,7 @@ TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
EXPECT_TRUE(reply->getResult().success());
}
-TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
+TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
setUpChain(FRONT);
uint32_t docSize = 1024;
@@ -548,7 +546,7 @@ TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize);
}
-TEST_P(MergeHandlerTest, max_timestamp) {
+TEST_F(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
MergeHandler handler = createHandler();
@@ -656,7 +654,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask
return getBucketDiffCmd;
}
-TEST_P(MergeHandlerTest, spi_flush_guard) {
+TEST_F(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
@@ -671,16 +669,14 @@ TEST_P(MergeHandlerTest, spi_flush_guard) {
try {
auto cmd = createDummyApplyDiff(6000);
handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
- if (GetParam()) {
- convert_delayed_error_to_exception(handler);
- }
+ convert_delayed_error_to_exception(handler);
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
}
}
-TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
+TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
@@ -688,7 +684,7 @@ TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
-TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
+TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -711,7 +707,7 @@ TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
+TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
@@ -743,7 +739,7 @@ TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask);
}
-TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
+TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
@@ -799,9 +795,7 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler,
providerWrapper.setFailureMask(failureMask);
try {
invoker.invoke(*this, handler, *_context);
- if (GetParam()) {
- convert_delayed_error_to_exception(handler);
- }
+ convert_delayed_error_to_exception(handler);
if (failureMask != 0) {
return (std::string("No exception was thrown during handler "
"invocation. Expected exception containing '")
@@ -870,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
+TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -901,7 +895,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
+TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -933,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -998,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
+TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -1073,9 +1067,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
- if (test.GetParam()) {
- convert_delayed_error_to_exception(test, handler);
- }
+ convert_delayed_error_to_exception(test, handler);
}
std::string
@@ -1099,7 +1091,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke(
}
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
HandleApplyBucketDiffReplyInvoker invoker;
for (int i = 0; i < 2; ++i) {
@@ -1126,7 +1118,7 @@ TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
}
}
-TEST_P(MergeHandlerTest, remove_from_diff) {
+TEST_F(MergeHandlerTest, remove_from_diff) {
framework::defaultimplementation::FakeClock clock;
MergeStatus status(clock, 0, 0);
@@ -1192,7 +1184,7 @@ TEST_P(MergeHandlerTest, remove_from_diff) {
}
}
-TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
+TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
setUpChain(BACK);
document::TestDocMan docMan;
@@ -1216,15 +1208,10 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
- if (GetParam()) {
- ASSERT_FALSE(tracker);
- handler.drain_async_writes();
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
- ASSERT_TRUE(applyBucketDiffReply.get());
- } else {
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
- ASSERT_TRUE(applyBucketDiffReply.get());
- }
+ ASSERT_FALSE(tracker);
+ handler.drain_async_writes();
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
+ ASSERT_TRUE(applyBucketDiffReply.get());
tracker.reset();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -1314,7 +1301,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en
}
-TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
using NodeList = decltype(_nodes);
// Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
@@ -1446,14 +1433,10 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
- if (GetParam()) {
- handler.drain_async_writes();
- }
+ handler.drain_async_writes();
ASSERT_EQ(6u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
}
-VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true));
-
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index f5b9da0e1f5..4a215c7a348 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -226,11 +226,6 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
*_filestorHandler, i % numStripes, _component));
}
_bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this));
- } else {
- std::lock_guard guard(_lock);
- for (auto& handler : _persistenceHandlers) {
- handler->configure(*config);
- }
}
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7dcf4bcbee2..4a5362f3d8d 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -27,8 +27,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize,
- uint32_t commonMergeChainOptimalizationMinimumSize,
- bool async_apply_bucket_diff)
+ uint32_t commonMergeChainOptimalizationMinimumSize)
: _clock(clock),
_cluster_context(cluster_context),
_env(env),
@@ -37,7 +36,6 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _async_apply_bucket_diff(async_apply_bucket_diff),
_executor(executor)
{
}
@@ -1281,9 +1279,6 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
- if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
- check_apply_diff_sync(std::move(async_results));
- }
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1388,9 +1383,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
if (applyDiffHasLocallyNeededData(diff, index)) {
async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
- if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
- check_apply_diff_sync(std::move(async_results));
- }
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
@@ -1481,12 +1473,6 @@ MergeHandler::drain_async_writes()
}
void
-MergeHandler::configure(bool async_apply_bucket_diff) noexcept
-{
- _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release);
-}
-
-void
MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const
{
auto bucket_id = state->get_bucket().getBucketId();
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 1007f35c241..e1c821aab48 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -50,8 +50,7 @@ public:
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize = 4190208,
- uint32_t commonMergeChainOptimalizationMinimumSize = 64,
- bool async_apply_bucket_diff = false);
+ uint32_t commonMergeChainOptimalizationMinimumSize = 64);
~MergeHandler() override;
@@ -79,7 +78,6 @@ public:
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
void drain_async_writes();
- void configure(bool async_apply_bucket_diff) noexcept;
private:
using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>;
@@ -91,7 +89,6 @@ private:
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
- std::atomic<bool> _async_apply_bucket_diff;
vespalib::ISequencedTaskExecutor& _executor;
MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index c0c95ffd7af..569da873cea 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -19,8 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
_processAllHandler(_env, provider),
_mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
cfg.bucketMergeChunkSize,
- cfg.commonMergeChainOptimalizationMinimumSize,
- cfg.asyncApplyBucketDiff),
+ cfg.commonMergeChainOptimalizationMinimumSize),
_asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider)
@@ -171,10 +170,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
-void
-PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept
-{
- _mergeHandler.configure(config.asyncApplyBucketDiff);
-}
-
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index c60fb05e56e..a92c2dc78ca 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -35,7 +35,6 @@ public:
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
- void configure(vespa::config::content::StorFilestorConfig& config) noexcept;
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;