diff options
Diffstat (limited to 'storage/src')
6 files changed, 50 insertions, 3 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 0def395aff0..19012cfab18 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -64,7 +64,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, BucketManager::~BucketManager() { - if (_thread.get() != 0) { + if (_thread) { LOG(error, "BucketManager deleted without calling close() first"); onClose(); } @@ -75,9 +75,9 @@ BucketManager::~BucketManager() void BucketManager::onClose() { // Stop internal thread such that we don't send any more messages down. - if (_thread.get() != 0) { + if (_thread) { _thread->interruptAndJoin(_workerLock, _workerCond); - _thread.reset(0); + _thread.reset(); } StorageLinkQueued::onClose(); } diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index d2dd74693c8..341f0f39d8f 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -31,6 +31,8 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _minBucketsPerVisitor(5), _maxClusterClockSkew(0), _inhibitMergeSendingOnBusyNodeDuration(std::chrono::seconds(60)), + _simulated_db_pruning_latency(0), + _simulated_db_merging_latency(0), _doInlineSplit(true), _enableJoinForSiblingLessBuckets(false), _enableInconsistentJoin(false), @@ -159,6 +161,8 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist if (config.inhibitMergeSendingOnBusyNodeDurationSec >= 0) { _inhibitMergeSendingOnBusyNodeDuration = std::chrono::seconds(config.inhibitMergeSendingOnBusyNodeDurationSec); } + _simulated_db_pruning_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbPruningLatencyMsec)); + _simulated_db_merging_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbMergingLatencyMsec)); LOG(debug, "Distributor now using new configuration parameters. Split limits: %d docs/%d bytes. " diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index a58767164a7..28a219dc3f6 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -229,6 +229,13 @@ public: return _inhibitMergeSendingOnBusyNodeDuration; } + std::chrono::milliseconds simulated_db_pruning_latency() const noexcept { + return _simulated_db_pruning_latency; + } + std::chrono::milliseconds simulated_db_merging_latency() const noexcept { + return _simulated_db_merging_latency; + } + bool getSequenceMutatingOperations() const noexcept { return _sequenceMutatingOperations; } @@ -276,6 +283,8 @@ private: MaintenancePriorities _maintenancePriorities; std::chrono::seconds _maxClusterClockSkew; std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration; + std::chrono::milliseconds _simulated_db_pruning_latency; + std::chrono::milliseconds _simulated_db_merging_latency; bool _doInlineSplit; bool _enableJoinForSiblingLessBuckets; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index d4f69073cc6..182aa2008c5 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -191,3 +191,11 @@ inhibit_merge_sending_on_busy_node_duration_sec int default=10 ## For this option to take effect, the cluster controller must also have two-phase ## states enabled. allow_stale_reads_during_cluster_state_transitions bool default=false + +## If greater than zero, injects a thread sleep into certain parts of the bucket +## processing logic. This allows for easier testing of racing edge cases where the +## main distributor thread is CPU-blocked processing large amounts of buckets, but +## without actually needing to use a lot of buckets in the test itself. +## Setting any of these values only makes sense for testing! +simulated_db_pruning_latency_msec int default=0 +simulated_db_merging_latency_msec int default=0 diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index c4c8dc439be..a901ac28a54 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -11,6 +11,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/vespalib/util/xmlstream.h> +#include <thread> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".distributor.bucketdb.updater"); @@ -168,9 +169,30 @@ BucketDBUpdater::removeSuperfluousBuckets( for (const auto& db_entry : proc.getNonOwnedEntries()) { readOnlyDb.update(db_entry); // TODO Entry move support } + maybe_inject_simulated_db_pruning_delay(); } } +namespace { + +void maybe_sleep_for(std::chrono::milliseconds ms) { + if (ms.count() > 0) { + std::this_thread::sleep_for(ms); + } +} + +} + +void +BucketDBUpdater::maybe_inject_simulated_db_pruning_delay() { + maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_pruning_latency()); +} + +void +BucketDBUpdater::maybe_inject_simulated_db_merging_delay() { + maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_merging_latency()); +} + void BucketDBUpdater::ensureTransitionTimerStarted() { @@ -603,6 +625,7 @@ BucketDBUpdater::activatePendingClusterState() framework::MilliSecTimer process_timer(_distributorComponent.getClock()); _pendingClusterState->mergeIntoBucketDatabases(); + maybe_inject_simulated_db_merging_delay(); if (_pendingClusterState->isVersionedTransition()) { LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 663ac488ef4..e69d328d8bc 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -179,6 +179,9 @@ private: void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&); void sendAllQueuedBucketRechecks(); + void maybe_inject_simulated_db_pruning_delay(); + void maybe_inject_simulated_db_merging_delay(); + friend class BucketDBUpdater_Test; friend class MergeOperation_Test; |