aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp4
-rw-r--r--storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp8
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp12
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp4
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp19
-rw-r--r--storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp7
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def1
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def4
-rw-r--r--storage/src/vespa/storage/config/stor-server.def8
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp21
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp104
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h55
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h2
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h2
17 files changed, 74 insertions, 205 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index 45d8fab7061..6173a43e25e 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -689,7 +689,7 @@ public:
static std::unique_ptr<lib::Distribution> default_grouped_distribution() {
return std::make_unique<lib::Distribution>(
- GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string(
+ lib::Distribution::ConfigWrapper(GlobalBucketSpaceDistributionConverter::string_to_config(vespalib::string(
R"(redundancy 2
group[3]
group[0].name "invalid"
@@ -708,7 +708,7 @@ group[2].nodes[3]
group[2].nodes[0].index 3
group[2].nodes[1].index 4
group[2].nodes[2].index 5
-)")));
+)"))));
}
static std::shared_ptr<lib::Distribution> derived_global_grouped_distribution() {
diff --git a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
index 5631ec71e4d..774f90821fa 100644
--- a/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
+++ b/storage/src/tests/common/global_bucket_space_distribution_converter_test.cpp
@@ -37,7 +37,6 @@ initial_redundancy 0
ensure_primary_persisted true
ready_copies 3
active_per_leaf_group true
-distributor_auto_ownership_transfer_on_whole_group_down true
group[0].index "invalid"
group[0].name "invalid"
group[0].capacity 1
@@ -48,7 +47,6 @@ group[0].nodes[1].index 1
group[0].nodes[1].retired false
group[0].nodes[2].index 2
group[0].nodes[2].retired false
-disk_distribution MODULO_BID
)");
}
@@ -92,7 +90,6 @@ initial_redundancy 0
ensure_primary_persisted true
ready_copies 6
active_per_leaf_group true
-distributor_auto_ownership_transfer_on_whole_group_down true
group[0].index "invalid"
group[0].name "invalid"
group[0].capacity 1
@@ -117,7 +114,6 @@ group[2].nodes[1].index 4
group[2].nodes[1].retired false
group[2].nodes[2].index 5
group[2].nodes[2].retired false
-disk_distribution MODULO_BID
)");
EXPECT_EQ(expected_global_config, default_to_global_config(default_config));
}
@@ -163,7 +159,6 @@ initial_redundancy 0
ensure_primary_persisted true
ready_copies 4
active_per_leaf_group true
-distributor_auto_ownership_transfer_on_whole_group_down true
group[0].index "invalid"
group[0].name "invalid"
group[0].capacity 1
@@ -200,7 +195,6 @@ group[6].capacity 1
group[6].partitions ""
group[6].nodes[0].index 3
group[6].nodes[0].retired false
-disk_distribution MODULO_BID
)");
EXPECT_EQ(expected_global_config, default_to_global_config(default_config));
}
@@ -234,7 +228,6 @@ initial_redundancy 0
ensure_primary_persisted true
ready_copies 3
active_per_leaf_group true
-distributor_auto_ownership_transfer_on_whole_group_down true
group[0].index "invalid"
group[0].name "invalid"
group[0].capacity 1
@@ -253,7 +246,6 @@ group[2].capacity 1
group[2].partitions ""
group[2].nodes[0].index 2
group[2].nodes[0].retired false
-disk_distribution MODULO_BID
)");
EXPECT_EQ(expected_global_config, default_to_global_config(default_config));
}
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index d21ecc814a5..51c0a75e45d 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -1580,18 +1580,6 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down) {
"distributor:6 .2.s:d storage:6"));
}
-TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) {
- std::string config = dist_config_6_nodes_across_4_groups();
- config += "distributor_auto_ownership_transfer_on_whole_group_down false\n";
- set_distribution(config);
-
- // Group is down, but config says to not do anything about it.
- EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}, _bucket_spaces.size() - 1),
- get_sent_nodes("distributor:6 storage:6",
- "distributor:6 .2.s:d .3.s:d storage:6"));
-}
-
-
namespace {
void
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index f79235ae505..4bd0570efa8 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -170,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils {
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64);
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize);
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64);
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208);
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index a480ba2740f..cdf203b8a39 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -179,7 +179,7 @@ struct MergeThrottlerTest : Test {
std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
+ void receive_chained_merge_with_full_queue(bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
auto& queue = _throttlers[throttler_idx]->getMergeQueue();
@@ -1234,10 +1234,9 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
}
void
-MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
- _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits);
size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
@@ -1269,21 +1268,13 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
}
TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) {
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue());
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
}
-TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) {
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false));
-
- _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
- auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
- EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult());
-}
-
TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) {
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue());
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
@@ -1293,7 +1284,7 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa
TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) {
// Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too.
- ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true));
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
// Unordered merge is immediately forwarded to the next node
_topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
diff --git a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp
index ec606af0690..eb42f19a5e8 100644
--- a/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp
+++ b/storage/src/vespa/storage/common/global_bucket_space_distribution_converter.cpp
@@ -4,7 +4,6 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/config/print/asciiconfigwriter.h>
#include <vespa/config/print/asciiconfigreader.hpp>
-#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vdslib/distribution/distribution_config_util.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <cassert>
@@ -22,9 +21,7 @@ struct Group {
std::map<uint16_t, std::unique_ptr<Group>> sub_groups;
};
-void set_distribution_invariant_config_fields(DistributionConfigBuilder& builder, const DistributionConfig& source) {
- builder.diskDistribution = source.diskDistribution;
- builder.distributorAutoOwnershipTransferOnWholeGroupDown = true;
+void set_distribution_invariant_config_fields(DistributionConfigBuilder& builder) {
builder.activePerLeafGroup = true;
// TODO consider how to best support n-of-m replication for global docs
builder.ensurePrimaryPersisted = true;
@@ -155,7 +152,7 @@ void build_global_groups(DistributionConfigBuilder& builder, const DistributionC
std::shared_ptr<DistributionConfig>
GlobalBucketSpaceDistributionConverter::convert_to_global(const DistributionConfig& source) {
DistributionConfigBuilder builder;
- set_distribution_invariant_config_fields(builder, source);
+ set_distribution_invariant_config_fields(builder);
build_global_groups(builder, source);
return std::make_shared<DistributionConfig>(builder);
}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 92ae38ea7c6..9e7fb600cae 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -18,6 +18,7 @@ mbus_distributor_node_max_pending_size int default=0
mbus_content_node_max_pending_size int default=0
# Minimum size of packets to compress (0 means no compression)
+## TODO Common compression config for mbus and rpc, and consider ZSTD as default
mbus.compress.limit int default=1024 restart
## Compression level for packets
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 927fe1db73b..3f6028d7fa1 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -161,3 +161,7 @@ enable_operation_cancellation bool default=false
## TODO GC very soon, it has no effect.
priority_merge_out_of_sync_copies int default=120
+
+## TODO GC as it has no effect
+use_btree_database bool default=true
+
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 26b8450ab20..8cd204bcf9f 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -79,14 +79,6 @@ merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648
## "source only", as merges do not cause mutations on such nodes.
resource_exhaustion_merge_back_pressure_duration_secs double default=30.0
-## If true, received merges that have already been accepted into the pending
-## merge window on at least one node will not be restricted by the configured
-## max_merge_queue_size limit. They will be allowed to enqueue regardless of
-## the current queue size. This avoids wasting the time spent being accepted
-## into merge windows, which would happen if the merge were to be bounced with
-## a busy-reply that would subsequently be unwound through the entire merge chain.
-disable_queue_limits_for_chained_merges bool default=true
-
## Whether the deadlock detector should be enabled or not. If disabled, it will
## still run, but it will never actually abort the process it is running in.
enable_dead_lock_detector bool default=false
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
index aac16f8b618..19d66f629c5 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -303,9 +303,6 @@ PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown(
const lib::ClusterState& state) const
{
const auto &dist(_bucket_space_state.get_distribution());
- if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) {
- return false; // Not doing anything for downed groups.
- }
const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex));
// If there is no group information associated with the node (because the
// group has changed or the node has been removed from config), we must
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 90703050009..093c11fb913 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -25,7 +25,6 @@
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/string_escape.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/config/subscription/configuri.h>
#include <vespa/config/helper/configfetcher.hpp>
#include <thread>
@@ -49,7 +48,7 @@ namespace {
class BucketExecutorWrapper : public spi::BucketExecutor {
public:
- BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { }
+ explicit BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { }
void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override {
_executor.execute(bucket, std::move(task));
@@ -213,7 +212,6 @@ FileStorManager::on_configure(const StorFilestorConfig& config)
_use_async_message_handling_on_schedule = config.useAsyncMessageHandlingOnSchedule;
_host_info_reporter.set_noise_level(config.resourceUsageReporterNoiseLevel);
const bool use_dynamic_throttling = (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC);
- const bool throttle_merge_feed_ops = config.asyncOperationThrottler.throttleIndividualMergeFeedOps;
if (!liveUpdate) {
_config = std::make_unique<StorFilestorConfig>(config);
@@ -243,11 +241,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config)
// TODO remove once desired throttling behavior is set in stone
{
_filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling);
- _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops);
- std::lock_guard guard(_lock);
- for (auto& ph : _persistenceHandlers) {
- ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops);
- }
+ _filestorHandler->set_throttle_apply_bucket_diff_ops(false);
}
}
@@ -312,7 +306,7 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const docu
if (results.size() > 1) {
error << "Bucket was inconsistent with " << results.size()
<< " entries so no automatic remapping done:";
- BucketMap::const_iterator it = results.begin();
+ auto it = results.begin();
for (uint32_t i=0; i <= 4 && it != results.end(); ++it, ++i) {
error << " " << it->first;
}
@@ -551,10 +545,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
StorBucketDatabase::WrappedEntry
-FileStorManager::ensureConsistentBucket(
- const document::Bucket& bucket,
- api::StorageMessage& msg,
- const char* callerId)
+FileStorManager::ensureConsistentBucket(const document::Bucket& bucket, api::StorageMessage& msg, const char* callerId)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), callerId, StorBucketDatabase::CREATE_IF_NONEXISTING));
@@ -565,7 +556,7 @@ FileStorManager::ensureConsistentBucket(
entry.remove();
}
replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split");
- return StorBucketDatabase::WrappedEntry();
+ return {};
}
return entry;
@@ -899,7 +890,7 @@ FileStorManager::maintenance_in_all_spaces(const lib::Node& node) const noexcept
if (!derived_cluster_state->getNodeState(node).getState().oneOf("m")) {
return false;
}
- };
+ }
return true;
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 1b7041583e8..7ee2d9f37bf 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -2,7 +2,6 @@
#include "mergehandler.h"
#include "persistenceutil.h"
-#include "shared_operation_throttler.h"
#include "apply_bucket_diff_entry_complete.h"
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
@@ -28,17 +27,14 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
- uint32_t maxChunkSize,
- uint32_t commonMergeChainOptimalizationMinimumSize)
+ uint32_t maxChunkSize)
: _clock(clock),
_cluster_context(cluster_context),
_env(env),
_spi(spi),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
- _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _executor(executor),
- _throttle_merge_feed_ops(true)
+ _executor(executor)
{
}
@@ -50,6 +46,8 @@ MergeHandler::~MergeHandler()
namespace {
+constexpr uint32_t COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE = 64u;
+
constexpr int getDeleteFlag() {
// Referred into old slotfile code before. Where should this number come from?
return 2;
@@ -177,7 +175,7 @@ MergeHandler::buildBucketInfoList(
std::vector<api::GetBucketDiffCommand::Entry>& output,
spi::Context& context) const
{
- assert(output.size() == 0);
+ assert(output.empty());
assert(myNodeIndex < 16);
uint32_t oldSize = output.size();
using DbBucketInfo = api::BucketInfo;
@@ -489,13 +487,12 @@ MergeHandler::fetchLocalData(
}
document::Document::UP
-MergeHandler::deserializeDiffDocument(
- const api::ApplyBucketDiffCommand::Entry& e,
- const document::DocumentTypeRepo& repo) const
+MergeHandler::deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e,
+ const document::DocumentTypeRepo& repo) const
{
auto doc = std::make_unique<document::Document>();
vespalib::nbostream hbuf(&e._headerBlob[0], e._headerBlob.size());
- if (e._bodyBlob.size() > 0) {
+ if (!e._bodyBlob.empty()) {
// TODO Remove this branch and add warning on error.
vespalib::nbostream bbuf(&e._bodyBlob[0], e._bodyBlob.size());
doc->deserialize(repo, hbuf, bbuf);
@@ -511,8 +508,7 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
const api::ApplyBucketDiffCommand::Entry& e,
const document::DocumentTypeRepo& repo) const
{
- auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one()
- : vespalib::SharedOperationThrottler::Token();
+ auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
@@ -536,17 +532,13 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
* Apply the diffs needed locally.
*/
void
-MergeHandler::applyDiffLocally(
- const spi::Bucket& bucket,
- std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- uint8_t nodeIndex,
- spi::Context& context,
- std::shared_ptr<ApplyBucketDiffState> async_results) const
+MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ uint8_t nodeIndex, spi::Context& context,
+ const std::shared_ptr<ApplyBucketDiffState> & async_results) const
{
// Sort the data to apply by which file they should be added to
LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries",
- bucket.toString().c_str(),
- diff.size());
+ bucket.toString().c_str(), diff.size());
uint32_t nodeMask = 1 << nodeIndex;
uint32_t byteCount = 0;
uint32_t addedCount = 0;
@@ -566,9 +558,8 @@ MergeHandler::applyDiffLocally(
if (spi::Timestamp(e._entry._timestamp) > existing.getTimestamp()) {
++j;
- LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and "
- "already present in persistence", bucket.toString().c_str(),
- existing.toString().c_str());
+ LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and already present in persistence",
+ bucket.toString().c_str(), existing.toString().c_str());
continue;
}
if ((e._entry._hasMask & nodeMask) != 0) {
@@ -579,8 +570,7 @@ MergeHandler::applyDiffLocally(
}
if (!e.filled()) {
++i;
- LOG(debug, "Failed to apply unretrieved entry %s to diff "
- "locally on %s. Entry was probably compacted away.",
+ LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.",
e.toString().c_str(), bucket.toString().c_str());
continue;
}
@@ -599,19 +589,14 @@ MergeHandler::applyDiffLocally(
++i;
++j;
if ((e._entry._flags & DELETED) && !existing.isRemove()) {
- LOG(debug, "Slot in diff is remove for existing "
- "timestamp in %s. Diff slot: %s. Existing slot: %s",
- bucket.toString().c_str(), e.toString().c_str(),
- existing.toString().c_str());
+ LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s",
+ bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
applyDiffEntry(async_results, bucket, e, repo);
} else {
// Duplicate put, just ignore it.
- LOG(debug, "During diff apply, attempting to add slot "
- "whose timestamp already exists in %s, but assuming "
- "these are for the same entry--ignoring it. "
- "Diff slot: %s. Existing slot: %s",
- bucket.toString().c_str(), e.toString().c_str(),
- existing.toString().c_str());
+ LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, "
+ " but assuming these are for the same entry--ignoring it. Diff slot: %s. Existing slot: %s",
+ bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
}
continue;
}
@@ -626,8 +611,7 @@ MergeHandler::applyDiffLocally(
continue;
}
if (!e.filled()) {
- LOG(debug, "Failed to apply unretrieved entry %s to diff "
- "locally on %s. Entry was probably compacted away.",
+ LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.",
e.toString().c_str(), bucket.toString().c_str());
continue;
}
@@ -653,19 +637,13 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket));
if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) {
LOG(warning, "Failed to get bucket info for %s: %s",
- bucket.toString().c_str(),
- infoResult.getErrorMessage().c_str());
- throw std::runtime_error("Failed to invoke getBucketInfo on "
- "persistence provider");
+ bucket.toString().c_str(), infoResult.getErrorMessage().c_str());
+ throw std::runtime_error("Failed to invoke getBucketInfo on persistence provider");
}
const spi::BucketInfo& tmpInfo(infoResult.getBucketInfo());
- api::BucketInfo providerInfo(tmpInfo.getChecksum(),
- tmpInfo.getDocumentCount(),
- tmpInfo.getDocumentSize(),
- tmpInfo.getEntryCount(),
- tmpInfo.getUsedSize(),
- tmpInfo.isReady(),
- tmpInfo.isActive());
+ api::BucketInfo providerInfo(tmpInfo.getChecksum(), tmpInfo.getDocumentCount(), tmpInfo.getDocumentSize(),
+ tmpInfo.getEntryCount(), tmpInfo.getUsedSize(),
+ tmpInfo.isReady(), tmpInfo.isActive());
_env.updateBucketDatabase(bucket.getBucket(), providerInfo);
}
@@ -701,9 +679,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
// If last action failed, fail the whole merge
if (status.reply->getResult().failed()) {
LOG(warning, "Done with merge of %s (failed: %s) %s",
- bucket.toString().c_str(),
- status.reply->getResult().toString().c_str(),
- status.toString().c_str());
+ bucket.toString().c_str(), status.reply->getResult().toString().c_str(), status.toString().c_str());
return status.reply;
}
@@ -735,13 +711,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes);
cmd->setAddress(createAddress(_cluster_context.cluster_name_ptr(), nodes[1].index));
- findCandidates(status,
- active_nodes_mask,
- true,
- 1 << (status.nodeList.size() - 1),
- 1 << (nodes.size() - 1),
- *cmd);
- if (cmd->getDiff().size() != 0) {
+ findCandidates(status, active_nodes_mask, true, 1 << (status.nodeList.size() - 1),
+ 1 << (nodes.size() - 1), *cmd);
+ if (!cmd->getDiff().empty()) {
break;
}
cmd.reset();
@@ -751,8 +723,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
active_nodes_mask = (1u << status.nodeList.size()) - 1;
// If only one node left in the merge, return ok.
if (status.nodeList.size() == 1) {
- LOG(debug, "Done with merge of %s as there is only one node "
- "that is not source only left in the merge.",
+ LOG(debug, "Done with merge of %s as there is only one node that is not source only left in the merge.",
bucket.toString().c_str());
return status.reply;
}
@@ -780,11 +751,8 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
if (e.first == 0u) {
continue;
}
- if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize)
- || counts.size() == 1)
- {
- LOG(spam, "Sending separate apply bucket diff for path %x "
- "with size %u",
+ if ((e.second >= COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE) || (counts.size() == 1)) {
+ LOG(spam, "Sending separate apply bucket diff for path %x with size %u",
e.first, e.second);
std::vector<api::MergeBucketCommand::Node> nodes;
// This node always has to be first in chain.
@@ -840,7 +808,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
status.pendingId = cmd->getMsgId();
LOG(debug, "Sending %s", cmd->toString().c_str());
sender.sendCommand(cmd);
- return api::StorageReply::SP();
+ return {};
}
/** Ensures merge states are deleted if we fail operation */
@@ -1206,7 +1174,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
assert(reply.getNodes().size() >= 2);
// Get bucket diff should retrieve all info at once
- assert(s->diff.size() == 0);
+ assert(s->diff.empty());
s->diff.insert(s->diff.end(),
reply.getDiff().begin(),
reply.getDiff().end());
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 43b51662fe6..f3bef802229 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -20,7 +20,6 @@
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/util/monitored_refcount.h>
#include <vespa/storageframework/generic/clock/time.h>
-#include <atomic>
namespace vespalib { class ISequencedTaskExecutor; }
namespace document { class Document; }
@@ -52,26 +51,17 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
- uint32_t maxChunkSize = 4190208,
- uint32_t commonMergeChainOptimalizationMinimumSize = 64);
+ uint32_t maxChunkSize = 4190208);
~MergeHandler() override;
- bool buildBucketInfoList(
- const spi::Bucket& bucket,
- Timestamp maxTimestamp,
- uint8_t myNodeIndex,
- std::vector<api::GetBucketDiffCommand::Entry>& output,
- spi::Context& context) const;
- void fetchLocalData(const spi::Bucket& bucket,
- std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- uint8_t nodeIndex,
- spi::Context& context) const;
- void applyDiffLocally(const spi::Bucket& bucket,
- std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- uint8_t nodeIndex,
- spi::Context& context,
- std::shared_ptr<ApplyBucketDiffState> async_results) const;
+ bool buildBucketInfoList(const spi::Bucket& bucket, Timestamp maxTimestamp, uint8_t myNodeIndex,
+ std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const;
+ void fetchLocalData(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ uint8_t nodeIndex, spi::Context& context) const;
+ void applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ uint8_t nodeIndex, spi::Context& context,
+ const std::shared_ptr<ApplyBucketDiffState> & async_results) const;
void sync_bucket_info(const spi::Bucket& bucket) const override;
void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override;
@@ -82,15 +72,6 @@ public:
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
void drain_async_writes();
- // Thread safe, as it's set during live reconfig from the main filestor manager.
- void set_throttle_merge_feed_ops(bool throttle) noexcept {
- _throttle_merge_feed_ops.store(throttle, std::memory_order_relaxed);
- }
-
- [[nodiscard]] bool throttle_merge_feed_ops() const noexcept {
- return _throttle_merge_feed_ops.load(std::memory_order_relaxed);
- }
-
private:
using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>;
const framework::Clock &_clock;
@@ -99,36 +80,26 @@ private:
spi::PersistenceProvider &_spi;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
- const uint32_t _commonMergeChainOptimalizationMinimumSize;
vespalib::ISequencedTaskExecutor& _executor;
- std::atomic<bool> _throttle_merge_feed_ops;
MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
- api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
- MergeStatus& status,
- MessageSender& sender,
- spi::Context& context,
- std::shared_ptr<ApplyBucketDiffState>& async_results) const;
+ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender,
+ spi::Context& context, std::shared_ptr<ApplyBucketDiffState>& async_results) const;
/**
* Invoke either put, remove or unrevertable remove on the SPI
* depending on the flags in the diff entry.
*/
- void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results,
- const spi::Bucket&,
- const api::ApplyBucketDiffCommand::Entry&,
- const document::DocumentTypeRepo& repo) const;
+ void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&,
+ const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const;
/**
* Fill entries-vector with metadata for bucket up to maxTimestamp,
* sorted ascendingly on entry timestamp.
* Throws std::runtime_error upon iteration failure.
*/
- void populateMetaData(const spi::Bucket&,
- Timestamp maxTimestamp,
- DocEntryList & entries,
- spi::Context& context) const;
+ void populateMetaData(const spi::Bucket&, Timestamp maxTimestamp, DocEntryList & entries, spi::Context& context) const;
std::unique_ptr<document::Document>
deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 29d39845f5a..87c1f83794e 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -19,9 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
: _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
- cfg.bucketMergeChunkSize,
- cfg.commonMergeChainOptimalizationMinimumSize),
+ _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize),
_asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider, component.getBucketIdFactory())
@@ -175,10 +173,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
-void
-PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept
-{
- _mergeHandler.set_throttle_merge_feed_ops(throttle);
-}
-
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index 595815d2bb3..1835b56528e 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -36,8 +36,6 @@ public:
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
-
- void set_throttle_merge_feed_ops(bool throttle) noexcept;
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index b99c238f9ab..c14bc6dc5eb 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -12,7 +12,6 @@
#include <vespa/messagebus/error.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/config/helper/configfetcher.hpp>
-#include <vespa/config/subscription/configuri.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/string_escape.h>
#include <vespa/vespalib/util/stringfmt.h>
@@ -206,7 +205,6 @@ MergeThrottler::MergeThrottler(
_active_merge_memory_used_bytes(0),
_max_merge_memory_usage_bytes(0), // 0 ==> unlimited
_use_dynamic_throttling(false),
- _disable_queue_limits_for_chained_merges(false),
_closing(false)
{
_throttlePolicy->setMinWindowSize(20);
@@ -252,7 +250,6 @@ MergeThrottler::on_configure(const StorServerConfig& new_config)
_maxQueueSize = new_config.maxMergeQueueSize;
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
- _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) {
_max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes);
} else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) {
@@ -437,8 +434,7 @@ MergeThrottler::enqueue_merge_for_later_processing(
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
}
- // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued
- const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor();
+ const bool is_forwarded_merge = !mergeCmd.from_distributor();
_queue.emplace(msg, _queueSequence++, is_forwarded_merge);
_metrics->queueSize.set(static_cast<int64_t>(_queue.size()));
}
@@ -767,8 +763,7 @@ bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) co
if (cmd.use_unordered_forwarding()) {
return cmd.from_distributor();
}
- return ((_queue.size() < _maxQueueSize)
- || (_disable_queue_limits_for_chained_merges && !cmd.from_distributor()));
+ return (_queue.size() < _maxQueueSize) || !cmd.from_distributor();
}
// Must be run from worker thread
@@ -1332,12 +1327,6 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
void
-MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept {
- std::lock_guard lock(_stateLock);
- _disable_queue_limits_for_chained_merges = disable_limits;
-}
-
-void
MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept {
std::lock_guard lock(_stateLock);
_max_merge_memory_usage_bytes = max_memory_bytes;
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index e210a8bfb8b..1e791136476 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -185,7 +185,6 @@ private:
size_t _active_merge_memory_used_bytes;
size_t _max_merge_memory_usage_bytes;
bool _use_dynamic_throttling;
- bool _disable_queue_limits_for_chained_merges;
bool _closing;
public:
/**
@@ -227,7 +226,6 @@ public:
// For unit testing only
const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
- void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept;
void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept;
[[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept;
void set_hw_info_locking(const vespalib::HwInfo& hw_info);