diff options
Diffstat (limited to 'storage')
17 files changed, 74 insertions, 205 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index 45d8fab7061..6173a43e25e 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -689,7 +689,7 @@ public: static std::unique_ptr<lib::Distribution> default_grouped_distribution() { return std::make_unique<lib::Distribution>( - GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string( + lib::Distribution::ConfigWrapper(GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string( R"(redundancy 2 group[3] group[0].name "invalid" @@ -708,7 +708,7 @@ group[2].nodes[3] group[2].nodes[0].index 3 group[2].nodes[1].index 4 group[2].nodes[2].index 5 -)"))); +)")))); } static std::shared_ptr<lib::Distribution> derived_global_grouped_distribution() { diff --git a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp index 5631ec71e4d..774f90821fa 100644 --- a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp +++ b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp @@ -37,7 +37,6 @@ initial_redundancy 0 ensure_primary_persisted true ready_copies 3 active_per_leaf_group true -distributor_auto_ownership_transfer_on_whole_group_down true group[0].index "invalid" group[0].name "invalid" group[0].capacity 1 @@ -48,7 +47,6 @@ group[0].nodes[1].index 1 group[0].nodes[1].retired false group[0].nodes[2].index 2 group[0].nodes[2].retired false -disk_distribution MODULO_BID )"); } @@ -92,7 +90,6 @@ initial_redundancy 0 ensure_primary_persisted true ready_copies 6 active_per_leaf_group true -distributor_auto_ownership_transfer_on_whole_group_down true group[0].index "invalid" group[0].name "invalid" group[0].capacity 1 @@ -117,7 +114,6 @@ group[2].nodes[1].index 4 group[2].nodes[1].retired false group[2].nodes[2].index 5 group[2].nodes[2].retired false -disk_distribution MODULO_BID )"); EXPECT_EQ(expected_global_config, default_to_global_config(default_config)); } @@ -163,7 +159,6 @@ initial_redundancy 0 ensure_primary_persisted true ready_copies 4 active_per_leaf_group true -distributor_auto_ownership_transfer_on_whole_group_down true group[0].index "invalid" group[0].name "invalid" group[0].capacity 1 @@ -200,7 +195,6 @@ group[6].capacity 1 group[6].partitions "" group[6].nodes[0].index 3 group[6].nodes[0].retired false -disk_distribution MODULO_BID )"); EXPECT_EQ(expected_global_config, default_to_global_config(default_config)); } @@ -234,7 +228,6 @@ initial_redundancy 0 ensure_primary_persisted true ready_copies 3 active_per_leaf_group true -distributor_auto_ownership_transfer_on_whole_group_down true group[0].index "invalid" group[0].name "invalid" group[0].capacity 1 @@ -253,7 +246,6 @@ group[2].capacity 1 group[2].partitions "" group[2].nodes[0].index 2 group[2].nodes[0].retired false -disk_distribution MODULO_BID )"); EXPECT_EQ(expected_global_config, default_to_global_config(default_config)); } diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index d21ecc814a5..51c0a75e45d 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -1580,18 +1580,6 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down) { "distributor:6 .2.s:d storage:6")); } -TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) { - std::string config = dist_config_6_nodes_across_4_groups(); - config += "distributor_auto_ownership_transfer_on_whole_group_down false\n"; - set_distribution(config); - - // Group is down, but config says to not do anything about it. - EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}, _bucket_spaces.size() - 1), - get_sent_nodes("distributor:6 storage:6", - "distributor:6 .2.s:d .3.s:d storage:6")); -} - - namespace { void diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index f79235ae505..4bd0570efa8 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -170,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils { MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208); } std::shared_ptr<api::StorageMessage> get_queued_reply() { diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index a480ba2740f..cdf203b8a39 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -179,7 +179,7 @@ struct MergeThrottlerTest : Test { std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg); void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); - void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false); + void receive_chained_merge_with_full_queue(bool unordered_fwd = false); std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { auto& queue = _throttlers[throttler_idx]->getMergeQueue(); @@ -1234,10 +1234,9 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist } void -MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd) +MergeThrottlerTest::receive_chained_merge_with_full_queue(bool unordered_fwd) { // Note: uses node with index 1 to not be the first node in chain - _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits); size_t max_pending = throttler_max_merges_pending(1); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); for (size_t i = 0; i < max_pending + max_enqueued; ++i) { @@ -1269,21 +1268,13 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim } TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) { - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue()); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); } -TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) { - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false)); - - _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); - auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); - EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult()); -} - TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) { - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue()); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); @@ -1293,7 +1284,7 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) { // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too. - ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true)); + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); // Unordered merge is immediately forwarded to the next node _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); diff --git a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp index ec606af0690..eb42f19a5e8 100644 --- a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp +++ b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp @@ -4,7 +4,6 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/config/print/asciiconfigwriter.h> #include <vespa/config/print/asciiconfigreader.hpp> -#include <vespa/vespalib/util/stringfmt.h> #include <vespa/vdslib/distribution/distribution_config_util.h> #include <vespa/vespalib/stllike/asciistream.h> #include <cassert> @@ -22,9 +21,7 @@ struct Group { std::map<uint16_t, std::unique_ptr<Group>> sub_groups; }; -void set_distribution_invariant_config_fields(DistributionConfigBuilder& builder, const DistributionConfig& source) { - builder.diskDistribution = source.diskDistribution; - builder.distributorAutoOwnershipTransferOnWholeGroupDown = true; +void set_distribution_invariant_config_fields(DistributionConfigBuilder& builder) { builder.activePerLeafGroup = true; // TODO consider how to best support n-of-m replication for global docs builder.ensurePrimaryPersisted = true; @@ -155,7 +152,7 @@ void build_global_groups(DistributionConfigBuilder& builder, const DistributionC std::shared_ptr<DistributionConfig> GlobalBucketSpaceDistributionConverter::convert_to_global(const DistributionConfig& source) { DistributionConfigBuilder builder; - set_distribution_invariant_config_fields(builder, source); + set_distribution_invariant_config_fields(builder); build_global_groups(builder, source); return std::make_shared<DistributionConfig>(builder); } diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 92ae38ea7c6..9e7fb600cae 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -18,6 +18,7 @@ mbus_distributor_node_max_pending_size int default=0 mbus_content_node_max_pending_size int default=0 # Minimum size of packets to compress (0 means no compression) +## TODO Common compression config for mbus and rpc, and consider ZSTD as default mbus.compress.limit int default=1024 restart ## Compression level for packets diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 927fe1db73b..3f6028d7fa1 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -161,3 +161,7 @@ enable_operation_cancellation bool default=false ## TODO GC very soon, it has no effect. priority_merge_out_of_sync_copies int default=120 + +## TODO GC as it has no effect +use_btree_database bool default=true + diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index 26b8450ab20..8cd204bcf9f 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -79,14 +79,6 @@ merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648 ## "source only", as merges do not cause mutations on such nodes. resource_exhaustion_merge_back_pressure_duration_secs double default=30.0 -## If true, received merges that have already been accepted into the pending -## merge window on at least one node will not be restricted by the configured -## max_merge_queue_size limit. They will be allowed to enqueue regardless of -## the current queue size. This avoids wasting the time spent being accepted -## into merge windows, which would happen if the merge were to be bounced with -## a busy-reply that would subsequently be unwound through the entire merge chain. -disable_queue_limits_for_chained_merges bool default=true - ## Whether the deadlock detector should be enabled or not. If disabled, it will ## still run, but it will never actually abort the process it is running in. enable_dead_lock_detector bool default=false diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index aac16f8b618..19d66f629c5 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -303,9 +303,6 @@ PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown( const lib::ClusterState& state) const { const auto &dist(_bucket_space_state.get_distribution()); - if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { - return false; // Not doing anything for downed groups. - } const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex)); // If there is no group information associated with the node (because the // group has changed or the node has been removed from config), we must diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 90703050009..093c11fb913 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -25,7 +25,6 @@ #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/string_escape.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/config/subscription/configuri.h> #include <vespa/config/helper/configfetcher.hpp> #include <thread> @@ -49,7 +48,7 @@ namespace { class BucketExecutorWrapper : public spi::BucketExecutor { public: - BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } + explicit BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override { _executor.execute(bucket, std::move(task)); @@ -213,7 +212,6 @@ FileStorManager::on_configure(const StorFilestorConfig& config) _use_async_message_handling_on_schedule = config.useAsyncMessageHandlingOnSchedule; _host_info_reporter.set_noise_level(config.resourceUsageReporterNoiseLevel); const bool use_dynamic_throttling = (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC); - const bool throttle_merge_feed_ops = config.asyncOperationThrottler.throttleIndividualMergeFeedOps; if (!liveUpdate) { _config = std::make_unique<StorFilestorConfig>(config); @@ -243,11 +241,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config) // TODO remove once desired throttling behavior is set in stone { _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling); - _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops); - std::lock_guard guard(_lock); - for (auto& ph : _persistenceHandlers) { - ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops); - } + _filestorHandler->set_throttle_apply_bucket_diff_ops(false); } } @@ -312,7 +306,7 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const docu if (results.size() > 1) { error << "Bucket was inconsistent with " << results.size() << " entries so no automatic remapping done:"; - BucketMap::const_iterator it = results.begin(); + auto it = results.begin(); for (uint32_t i=0; i <= 4 && it != results.end(); ++it, ++i) { error << " " << it->first; } @@ -551,10 +545,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) StorBucketDatabase::WrappedEntry -FileStorManager::ensureConsistentBucket( - const document::Bucket& bucket, - api::StorageMessage& msg, - const char* callerId) +FileStorManager::ensureConsistentBucket(const document::Bucket& bucket, api::StorageMessage& msg, const char* callerId) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), callerId, StorBucketDatabase::CREATE_IF_NONEXISTING)); @@ -565,7 +556,7 @@ FileStorManager::ensureConsistentBucket( entry.remove(); } replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split"); - return StorBucketDatabase::WrappedEntry(); + return {}; } return entry; @@ -899,7 +890,7 @@ FileStorManager::maintenance_in_all_spaces(const lib::Node& node) const noexcept if (!derived_cluster_state->getNodeState(node).getState().oneOf("m")) { return false; } - }; + } return true; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 1b7041583e8..7ee2d9f37bf 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -2,7 +2,6 @@ #include "mergehandler.h" #include "persistenceutil.h" -#include "shared_operation_throttler.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> @@ -28,17 +27,14 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, - uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize) + uint32_t maxChunkSize) : _clock(clock), _cluster_context(cluster_context), _env(env), _spi(spi), _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), - _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _executor(executor), - _throttle_merge_feed_ops(true) + _executor(executor) { } @@ -50,6 +46,8 @@ MergeHandler::~MergeHandler() namespace { +constexpr uint32_t COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE = 64u; + constexpr int getDeleteFlag() { // Referred into old slotfile code before. Where should this number come from? return 2; @@ -177,7 +175,7 @@ MergeHandler::buildBucketInfoList( std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const { - assert(output.size() == 0); + assert(output.empty()); assert(myNodeIndex < 16); uint32_t oldSize = output.size(); using DbBucketInfo = api::BucketInfo; @@ -489,13 +487,12 @@ MergeHandler::fetchLocalData( } document::Document::UP -MergeHandler::deserializeDiffDocument( - const api::ApplyBucketDiffCommand::Entry& e, - const document::DocumentTypeRepo& repo) const +MergeHandler::deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, + const document::DocumentTypeRepo& repo) const { auto doc = std::make_unique<document::Document>(); vespalib::nbostream hbuf(&e._headerBlob[0], e._headerBlob.size()); - if (e._bodyBlob.size() > 0) { + if (!e._bodyBlob.empty()) { // TODO Remove this branch and add warning on error. vespalib::nbostream bbuf(&e._bodyBlob[0], e._bodyBlob.size()); doc->deserialize(repo, hbuf, bbuf); @@ -511,8 +508,7 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const { - auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one() - : vespalib::SharedOperationThrottler::Token(); + auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry @@ -536,17 +532,13 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results * Apply the diffs needed locally. */ void -MergeHandler::applyDiffLocally( - const spi::Bucket& bucket, - std::vector<api::ApplyBucketDiffCommand::Entry>& diff, - uint8_t nodeIndex, - spi::Context& context, - std::shared_ptr<ApplyBucketDiffState> async_results) const +MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + uint8_t nodeIndex, spi::Context& context, + const std::shared_ptr<ApplyBucketDiffState> & async_results) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", - bucket.toString().c_str(), - diff.size()); + bucket.toString().c_str(), diff.size()); uint32_t nodeMask = 1 << nodeIndex; uint32_t byteCount = 0; uint32_t addedCount = 0; @@ -566,9 +558,8 @@ MergeHandler::applyDiffLocally( if (spi::Timestamp(e._entry._timestamp) > existing.getTimestamp()) { ++j; - LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and " - "already present in persistence", bucket.toString().c_str(), - existing.toString().c_str()); + LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and already present in persistence", + bucket.toString().c_str(), existing.toString().c_str()); continue; } if ((e._entry._hasMask & nodeMask) != 0) { @@ -579,8 +570,7 @@ MergeHandler::applyDiffLocally( } if (!e.filled()) { ++i; - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -599,19 +589,14 @@ MergeHandler::applyDiffLocally( ++i; ++j; if ((e._entry._flags & DELETED) && !existing.isRemove()) { - LOG(debug, "Slot in diff is remove for existing " - "timestamp in %s. Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); applyDiffEntry(async_results, bucket, e, repo); } else { // Duplicate put, just ignore it. - LOG(debug, "During diff apply, attempting to add slot " - "whose timestamp already exists in %s, but assuming " - "these are for the same entry--ignoring it. " - "Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, " + " but assuming these are for the same entry--ignoring it. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); } continue; } @@ -626,8 +611,7 @@ MergeHandler::applyDiffLocally( continue; } if (!e.filled()) { - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -653,19 +637,13 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket)); if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) { LOG(warning, "Failed to get bucket info for %s: %s", - bucket.toString().c_str(), - infoResult.getErrorMessage().c_str()); - throw std::runtime_error("Failed to invoke getBucketInfo on " - "persistence provider"); + bucket.toString().c_str(), infoResult.getErrorMessage().c_str()); + throw std::runtime_error("Failed to invoke getBucketInfo on persistence provider"); } const spi::BucketInfo& tmpInfo(infoResult.getBucketInfo()); - api::BucketInfo providerInfo(tmpInfo.getChecksum(), - tmpInfo.getDocumentCount(), - tmpInfo.getDocumentSize(), - tmpInfo.getEntryCount(), - tmpInfo.getUsedSize(), - tmpInfo.isReady(), - tmpInfo.isActive()); + api::BucketInfo providerInfo(tmpInfo.getChecksum(), tmpInfo.getDocumentCount(), tmpInfo.getDocumentSize(), + tmpInfo.getEntryCount(), tmpInfo.getUsedSize(), + tmpInfo.isReady(), tmpInfo.isActive()); _env.updateBucketDatabase(bucket.getBucket(), providerInfo); } @@ -701,9 +679,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { LOG(warning, "Done with merge of %s (failed: %s) %s", - bucket.toString().c_str(), - status.reply->getResult().toString().c_str(), - status.toString().c_str()); + bucket.toString().c_str(), status.reply->getResult().toString().c_str(), status.toString().c_str()); return status.reply; } @@ -735,13 +711,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); cmd->setAddress(createAddress(_cluster_context.cluster_name_ptr(), nodes[1].index)); - findCandidates(status, - active_nodes_mask, - true, - 1 << (status.nodeList.size() - 1), - 1 << (nodes.size() - 1), - *cmd); - if (cmd->getDiff().size() != 0) { + findCandidates(status, active_nodes_mask, true, 1 << (status.nodeList.size() - 1), + 1 << (nodes.size() - 1), *cmd); + if (!cmd->getDiff().empty()) { break; } cmd.reset(); @@ -751,8 +723,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, active_nodes_mask = (1u << status.nodeList.size()) - 1; // If only one node left in the merge, return ok. if (status.nodeList.size() == 1) { - LOG(debug, "Done with merge of %s as there is only one node " - "that is not source only left in the merge.", + LOG(debug, "Done with merge of %s as there is only one node that is not source only left in the merge.", bucket.toString().c_str()); return status.reply; } @@ -780,11 +751,8 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, if (e.first == 0u) { continue; } - if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize) - || counts.size() == 1) - { - LOG(spam, "Sending separate apply bucket diff for path %x " - "with size %u", + if ((e.second >= COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE) || (counts.size() == 1)) { + LOG(spam, "Sending separate apply bucket diff for path %x with size %u", e.first, e.second); std::vector<api::MergeBucketCommand::Node> nodes; // This node always has to be first in chain. @@ -840,7 +808,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, status.pendingId = cmd->getMsgId(); LOG(debug, "Sending %s", cmd->toString().c_str()); sender.sendCommand(cmd); - return api::StorageReply::SP(); + return {}; } /** Ensures merge states are deleted if we fail operation */ @@ -1206,7 +1174,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s->diff.size() == 0); + assert(s->diff.empty()); s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 43b51662fe6..f3bef802229 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -20,7 +20,6 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/storageframework/generic/clock/time.h> -#include <atomic> namespace vespalib { class ISequencedTaskExecutor; } namespace document { class Document; } @@ -52,26 +51,17 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, - uint32_t maxChunkSize = 4190208, - uint32_t commonMergeChainOptimalizationMinimumSize = 64); + uint32_t maxChunkSize = 4190208); ~MergeHandler() override; - bool buildBucketInfoList( - const spi::Bucket& bucket, - Timestamp maxTimestamp, - uint8_t myNodeIndex, - std::vector<api::GetBucketDiffCommand::Entry>& output, - spi::Context& context) const; - void fetchLocalData(const spi::Bucket& bucket, - std::vector<api::ApplyBucketDiffCommand::Entry>& diff, - uint8_t nodeIndex, - spi::Context& context) const; - void applyDiffLocally(const spi::Bucket& bucket, - std::vector<api::ApplyBucketDiffCommand::Entry>& diff, - uint8_t nodeIndex, - spi::Context& context, - std::shared_ptr<ApplyBucketDiffState> async_results) const; + bool buildBucketInfoList(const spi::Bucket& bucket, Timestamp maxTimestamp, uint8_t myNodeIndex, + std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const; + void fetchLocalData(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + uint8_t nodeIndex, spi::Context& context) const; + void applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + uint8_t nodeIndex, spi::Context& context, + const std::shared_ptr<ApplyBucketDiffState> & async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override; @@ -82,15 +72,6 @@ public: void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); - // Thread safe, as it's set during live reconfig from the main filestor manager. - void set_throttle_merge_feed_ops(bool throttle) noexcept { - _throttle_merge_feed_ops.store(throttle, std::memory_order_relaxed); - } - - [[nodiscard]] bool throttle_merge_feed_ops() const noexcept { - return _throttle_merge_feed_ops.load(std::memory_order_relaxed); - } - private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; const framework::Clock &_clock; @@ -99,36 +80,26 @@ private: spi::PersistenceProvider &_spi; std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; - const uint32_t _commonMergeChainOptimalizationMinimumSize; vespalib::ISequencedTaskExecutor& _executor; - std::atomic<bool> _throttle_merge_feed_ops; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ - api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, - MergeStatus& status, - MessageSender& sender, - spi::Context& context, - std::shared_ptr<ApplyBucketDiffState>& async_results) const; + api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, + spi::Context& context, std::shared_ptr<ApplyBucketDiffState>& async_results) const; /** * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. */ - void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, - const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, - const document::DocumentTypeRepo& repo) const; + void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&, + const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, * sorted ascendingly on entry timestamp. * Throws std::runtime_error upon iteration failure. */ - void populateMetaData(const spi::Bucket&, - Timestamp maxTimestamp, - DocEntryList & entries, - spi::Context& context) const; + void populateMetaData(const spi::Bucket&, Timestamp maxTimestamp, DocEntryList & entries, spi::Context& context) const; std::unique_ptr<document::Document> deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 29d39845f5a..87c1f83794e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -19,9 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, - cfg.bucketMergeChunkSize, - cfg.commonMergeChainOptimalizationMinimumSize), + _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize), _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider, component.getBucketIdFactory()) @@ -175,10 +173,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } -void -PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept -{ - _mergeHandler.set_throttle_merge_feed_ops(throttle); -} - } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index 595815d2bb3..1835b56528e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -36,8 +36,6 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } - - void set_throttle_merge_feed_ops(bool throttle) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index b99c238f9ab..c14bc6dc5eb 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -12,7 +12,6 @@ #include <vespa/messagebus/error.h> #include <vespa/config/common/exceptions.h> #include <vespa/config/helper/configfetcher.hpp> -#include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/string_escape.h> #include <vespa/vespalib/util/stringfmt.h> @@ -206,7 +205,6 @@ MergeThrottler::MergeThrottler( _active_merge_memory_used_bytes(0), _max_merge_memory_usage_bytes(0), // 0 ==> unlimited _use_dynamic_throttling(false), - _disable_queue_limits_for_chained_merges(false), _closing(false) { _throttlePolicy->setMinWindowSize(20); @@ -252,7 +250,6 @@ MergeThrottler::on_configure(const StorServerConfig& new_config) _maxQueueSize = new_config.maxMergeQueueSize; _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); - _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) { _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes); } else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) { @@ -437,8 +434,7 @@ MergeThrottler::enqueue_merge_for_later_processing( if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; } - // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued - const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor(); + const bool is_forwarded_merge = !mergeCmd.from_distributor(); _queue.emplace(msg, _queueSequence++, is_forwarded_merge); _metrics->queueSize.set(static_cast<int64_t>(_queue.size())); } @@ -767,8 +763,7 @@ bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) co if (cmd.use_unordered_forwarding()) { return cmd.from_distributor(); } - return ((_queue.size() < _maxQueueSize) - || (_disable_queue_limits_for_chained_merges && !cmd.from_distributor())); + return (_queue.size() < _maxQueueSize) || !cmd.from_distributor(); } // Must be run from worker thread @@ -1332,12 +1327,6 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion) } void -MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept { - std::lock_guard lock(_stateLock); - _disable_queue_limits_for_chained_merges = disable_limits; -} - -void MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept { std::lock_guard lock(_stateLock); _max_merge_memory_usage_bytes = max_memory_bytes; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index e210a8bfb8b..1e791136476 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -185,7 +185,6 @@ private: size_t _active_merge_memory_used_bytes; size_t _max_merge_memory_usage_bytes; bool _use_dynamic_throttling; - bool _disable_queue_limits_for_chained_merges; bool _closing; public: /** @@ -227,7 +226,6 @@ public: // For unit testing only const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } - void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept; void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept; [[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept; void set_hw_info_locking(const vespalib::HwInfo& hw_info); |