summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def4
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h3
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp3
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp85
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h1
11 files changed, 38 insertions, 102 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 1aaa7f71f8e..2b1105803e2 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -43,10 +43,6 @@ common_merge_chain_optimalization_minimum_size int default=64 restart
## Note that this will gradually be increased to reach stor-distributormanager:splitsize which is currently at 32M
bucket_merge_chunk_size int default=33554432 restart
-## If set, portions of apply bucket diff handling will be performed asynchronously
-## with persistence thread not waiting for local writes to complete.
-async_apply_bucket_diff bool default=true
-
## When merging, it is possible to send more metadata than needed in order to
## let local nodes in merge decide which entries fits best to add this time
## based on disk location. Toggle this option on to use it. Note that memory
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
index c1dbe9b2bd2..beb1e0bd6bc 100644
--- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
+++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
@@ -465,7 +465,6 @@ App::usage()
"USAGE:\n";
std::cerr <<
"vespa-redistribute-bm\n"
- "[--async-apply-bucket-diff]\n"
"[--bucket-db-stripe-bits bits]\n"
"[--client-threads threads]\n"
"[--distributor-merge-busy-wait distributor-merge-busy-wait]\n"
@@ -502,7 +501,6 @@ App::get_options()
const char *opt_argument = nullptr;
int long_opt_index = 0;
static struct option long_opts[] = {
- { "async-apply-bucket-diff", 0, nullptr, 0 },
{ "bucket-db-stripe-bits", 1, nullptr, 0 },
{ "client-threads", 1, nullptr, 0 },
{ "distributor-merge-busy-wait", 1, nullptr, 0 },
@@ -533,7 +531,6 @@ App::get_options()
{ nullptr, 0, nullptr, 0 }
};
enum longopts_enum {
- LONGOPT_ASYNC_APPLY_BUCKET_DIFF,
LONGOPT_BUCKET_DB_STRIPE_BITS,
LONGOPT_CLIENT_THREADS,
LONGOPT_DISTRIBUTOR_MERGE_BUSY_WAIT,
@@ -568,9 +565,6 @@ App::get_options()
switch (c) {
case 0:
switch(long_opt_index) {
- case LONGOPT_ASYNC_APPLY_BUCKET_DIFF:
- _bm_params.set_async_apply_bucket_diff(true);
- break;
case LONGOPT_BUCKET_DB_STRIPE_BITS:
_bm_params.set_bucket_db_stripe_bits(atoi(opt_argument));
break;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
index 4a3466f1a51..3ff10b19164 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
@@ -6,8 +6,7 @@
namespace search::bmcluster {
BmClusterParams::BmClusterParams()
- : _async_apply_bucket_diff(),
- _bucket_db_stripe_bits(4),
+ : _bucket_db_stripe_bits(4),
_disable_queue_limits_for_chained_merges(false), // Same default as in stor-server.def
_distributor_merge_busy_wait(10), // Same default as stor_distributormanager.def
_distributor_stripes(0),
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
index 36b4c22f6a8..d365a28b0b6 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
@@ -13,7 +13,6 @@ namespace search::bmcluster {
*/
class BmClusterParams
{
- std::optional<bool> _async_apply_bucket_diff;
uint32_t _bucket_db_stripe_bits;
bool _disable_queue_limits_for_chained_merges;
uint32_t _distributor_merge_busy_wait;
@@ -45,7 +44,6 @@ class BmClusterParams
public:
BmClusterParams();
~BmClusterParams();
- const std::optional<bool>& get_async_apply_bucket_diff() const noexcept { return _async_apply_bucket_diff; }
uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; }
bool get_disable_queue_limits_for_chained_merges() const noexcept { return _disable_queue_limits_for_chained_merges; }
uint32_t get_distributor_merge_busy_wait() const { return _distributor_merge_busy_wait; }
@@ -75,7 +73,6 @@ public:
bool needs_distributor() const { return _enable_distributor || _use_document_api; }
bool needs_message_bus() const { return _use_message_bus || _use_document_api; }
bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
- void set_async_apply_bucket_diff(bool value) { _async_apply_bucket_diff = value; }
void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; }
void set_disable_queue_limits_for_chained_merges(bool value) { _disable_queue_limits_for_chained_merges = value; }
void set_distributor_merge_busy_wait(uint32_t value) { _distributor_merge_busy_wait = value; }
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index db2060bacf7..3587e8008f2 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -382,9 +382,6 @@ struct ServiceLayerConfigSet : public StorageConfigSet
stor_bucket_init(),
stor_visitor()
{
- if (params.get_async_apply_bucket_diff().has_value()) {
- stor_filestor.asyncApplyBucketDiff = params.get_async_apply_bucket_diff().value();
- }
stor_filestor.numResponseThreads = params.get_response_threads();
stor_filestor.numNetworkThreads = params.get_rpc_network_threads();
stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule();
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 6288f86993d..e9d399d357f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -21,11 +21,9 @@ using namespace ::testing;
namespace storage {
/*
- * Class for testing merge handler taking async_apply_bucket_diff as
- * parameter for the test.
+ * Class for testing merge handler.
*/
-struct MergeHandlerTest : PersistenceTestUtils,
- public testing::WithParamInterface<bool> {
+struct MergeHandlerTest : PersistenceTestUtils {
uint32_t _location; // Location used for all merge tests
document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
@@ -172,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils,
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64);
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64);
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
@@ -232,7 +230,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
-TEST_P(MergeHandlerTest, merge_bucket_command) {
+TEST_F(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -293,11 +291,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
EXPECT_EQ(17, diff.size());
}
-TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) {
+TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) {
testGetBucketDiffChain(true);
}
-TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) {
+TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) {
testGetBucketDiffChain(false);
}
@@ -344,17 +342,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_EQ(0, diff.size());
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) {
testApplyBucketDiffChain(true);
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
testApplyBucketDiffChain(false);
}
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
-TEST_P(MergeHandlerTest, master_message_flow) {
+TEST_F(MergeHandlerTest, master_message_flow) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -448,7 +446,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
}
-TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
+TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
uint32_t docSize = 1024;
uint32_t docCount = 10;
uint32_t maxChunkSize = docSize * 3;
@@ -512,7 +510,7 @@ TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
EXPECT_TRUE(reply->getResult().success());
}
-TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
+TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
setUpChain(FRONT);
uint32_t docSize = 1024;
@@ -548,7 +546,7 @@ TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize);
}
-TEST_P(MergeHandlerTest, max_timestamp) {
+TEST_F(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
MergeHandler handler = createHandler();
@@ -656,7 +654,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask
return getBucketDiffCmd;
}
-TEST_P(MergeHandlerTest, spi_flush_guard) {
+TEST_F(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
@@ -671,16 +669,14 @@ TEST_P(MergeHandlerTest, spi_flush_guard) {
try {
auto cmd = createDummyApplyDiff(6000);
handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
- if (GetParam()) {
- convert_delayed_error_to_exception(handler);
- }
+ convert_delayed_error_to_exception(handler);
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
}
}
-TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
+TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
@@ -688,7 +684,7 @@ TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
-TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
+TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -711,7 +707,7 @@ TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
+TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
@@ -743,7 +739,7 @@ TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask);
}
-TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
+TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
@@ -799,9 +795,7 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler,
providerWrapper.setFailureMask(failureMask);
try {
invoker.invoke(*this, handler, *_context);
- if (GetParam()) {
- convert_delayed_error_to_exception(handler);
- }
+ convert_delayed_error_to_exception(handler);
if (failureMask != 0) {
return (std::string("No exception was thrown during handler "
"invocation. Expected exception containing '")
@@ -870,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
+TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -901,7 +895,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
+TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -933,7 +927,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -998,7 +992,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
+TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -1073,9 +1067,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
- if (test.GetParam()) {
- convert_delayed_error_to_exception(test, handler);
- }
+ convert_delayed_error_to_exception(test, handler);
}
std::string
@@ -1099,7 +1091,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke(
}
}
-TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
+TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
HandleApplyBucketDiffReplyInvoker invoker;
for (int i = 0; i < 2; ++i) {
@@ -1126,7 +1118,7 @@ TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
}
}
-TEST_P(MergeHandlerTest, remove_from_diff) {
+TEST_F(MergeHandlerTest, remove_from_diff) {
framework::defaultimplementation::FakeClock clock;
MergeStatus status(clock, 0, 0);
@@ -1192,7 +1184,7 @@ TEST_P(MergeHandlerTest, remove_from_diff) {
}
}
-TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
+TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
setUpChain(BACK);
document::TestDocMan docMan;
@@ -1216,15 +1208,10 @@ TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
- if (GetParam()) {
- ASSERT_FALSE(tracker);
- handler.drain_async_writes();
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
- ASSERT_TRUE(applyBucketDiffReply.get());
- } else {
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
- ASSERT_TRUE(applyBucketDiffReply.get());
- }
+ ASSERT_FALSE(tracker);
+ handler.drain_async_writes();
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
+ ASSERT_TRUE(applyBucketDiffReply.get());
tracker.reset();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
@@ -1314,7 +1301,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en
}
-TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
using NodeList = decltype(_nodes);
// Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
@@ -1446,14 +1433,10 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
- if (GetParam()) {
- handler.drain_async_writes();
- }
+ handler.drain_async_writes();
ASSERT_EQ(6u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
}
-VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true));
-
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index f5b9da0e1f5..4a215c7a348 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -226,11 +226,6 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
*_filestorHandler, i % numStripes, _component));
}
_bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this));
- } else {
- std::lock_guard guard(_lock);
- for (auto& handler : _persistenceHandlers) {
- handler->configure(*config);
- }
}
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7dcf4bcbee2..4a5362f3d8d 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -27,8 +27,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize,
- uint32_t commonMergeChainOptimalizationMinimumSize,
- bool async_apply_bucket_diff)
+ uint32_t commonMergeChainOptimalizationMinimumSize)
: _clock(clock),
_cluster_context(cluster_context),
_env(env),
@@ -37,7 +36,6 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _async_apply_bucket_diff(async_apply_bucket_diff),
_executor(executor)
{
}
@@ -1281,9 +1279,6 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
- if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
- check_apply_diff_sync(std::move(async_results));
- }
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1388,9 +1383,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
if (applyDiffHasLocallyNeededData(diff, index)) {
async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
- if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
- check_apply_diff_sync(std::move(async_results));
- }
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
@@ -1481,12 +1473,6 @@ MergeHandler::drain_async_writes()
}
void
-MergeHandler::configure(bool async_apply_bucket_diff) noexcept
-{
- _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release);
-}
-
-void
MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const
{
auto bucket_id = state->get_bucket().getBucketId();
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 1007f35c241..e1c821aab48 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -50,8 +50,7 @@ public:
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize = 4190208,
- uint32_t commonMergeChainOptimalizationMinimumSize = 64,
- bool async_apply_bucket_diff = false);
+ uint32_t commonMergeChainOptimalizationMinimumSize = 64);
~MergeHandler() override;
@@ -79,7 +78,6 @@ public:
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
void drain_async_writes();
- void configure(bool async_apply_bucket_diff) noexcept;
private:
using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>;
@@ -91,7 +89,6 @@ private:
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
- std::atomic<bool> _async_apply_bucket_diff;
vespalib::ISequencedTaskExecutor& _executor;
MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index c0c95ffd7af..569da873cea 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -19,8 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
_processAllHandler(_env, provider),
_mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
cfg.bucketMergeChunkSize,
- cfg.commonMergeChainOptimalizationMinimumSize,
- cfg.asyncApplyBucketDiff),
+ cfg.commonMergeChainOptimalizationMinimumSize),
_asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider)
@@ -171,10 +170,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
-void
-PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept
-{
- _mergeHandler.configure(config.asyncApplyBucketDiff);
-}
-
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index c60fb05e56e..a92c2dc78ca 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -35,7 +35,6 @@ public:
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
- void configure(vespa::config::content::StorFilestorConfig& config) noexcept;
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;