summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp6
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp4
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h9
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def8
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp23
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h3
6 files changed, 50 insertions, 3 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 0def395aff0..19012cfab18 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -64,7 +64,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri,
BucketManager::~BucketManager()
{
- if (_thread.get() != 0) {
+ if (_thread) {
LOG(error, "BucketManager deleted without calling close() first");
onClose();
}
@@ -75,9 +75,9 @@ BucketManager::~BucketManager()
void BucketManager::onClose()
{
// Stop internal thread such that we don't send any more messages down.
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interruptAndJoin(_workerLock, _workerCond);
- _thread.reset(0);
+ _thread.reset();
}
StorageLinkQueued::onClose();
}
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index d2dd74693c8..341f0f39d8f 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -31,6 +31,8 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_minBucketsPerVisitor(5),
_maxClusterClockSkew(0),
_inhibitMergeSendingOnBusyNodeDuration(std::chrono::seconds(60)),
+ _simulated_db_pruning_latency(0),
+ _simulated_db_merging_latency(0),
_doInlineSplit(true),
_enableJoinForSiblingLessBuckets(false),
_enableInconsistentJoin(false),
@@ -159,6 +161,8 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
if (config.inhibitMergeSendingOnBusyNodeDurationSec >= 0) {
_inhibitMergeSendingOnBusyNodeDuration = std::chrono::seconds(config.inhibitMergeSendingOnBusyNodeDurationSec);
}
+ _simulated_db_pruning_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbPruningLatencyMsec));
+ _simulated_db_merging_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbMergingLatencyMsec));
LOG(debug,
"Distributor now using new configuration parameters. Split limits: %d docs/%d bytes. "
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index a58767164a7..28a219dc3f6 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -229,6 +229,13 @@ public:
return _inhibitMergeSendingOnBusyNodeDuration;
}
+ std::chrono::milliseconds simulated_db_pruning_latency() const noexcept {
+ return _simulated_db_pruning_latency;
+ }
+ std::chrono::milliseconds simulated_db_merging_latency() const noexcept {
+ return _simulated_db_merging_latency;
+ }
+
bool getSequenceMutatingOperations() const noexcept {
return _sequenceMutatingOperations;
}
@@ -276,6 +283,8 @@ private:
MaintenancePriorities _maintenancePriorities;
std::chrono::seconds _maxClusterClockSkew;
std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration;
+ std::chrono::milliseconds _simulated_db_pruning_latency;
+ std::chrono::milliseconds _simulated_db_merging_latency;
bool _doInlineSplit;
bool _enableJoinForSiblingLessBuckets;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index d4f69073cc6..182aa2008c5 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -191,3 +191,11 @@ inhibit_merge_sending_on_busy_node_duration_sec int default=10
## For this option to take effect, the cluster controller must also have two-phase
## states enabled.
allow_stale_reads_during_cluster_state_transitions bool default=false
+
+## If greater than zero, injects a thread sleep into certain parts of the bucket
+## processing logic. This allows for easier testing of racing edge cases where the
+## main distributor thread is CPU-blocked processing large amounts of buckets, but
+## without actually needing to use a lot of buckets in the test itself.
+## Setting any of these values only makes sense for testing!
+simulated_db_pruning_latency_msec int default=0
+simulated_db_merging_latency_msec int default=0
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index c4c8dc439be..a901ac28a54 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -11,6 +11,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/vespalib/util/xmlstream.h>
+#include <thread>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.bucketdb.updater");
@@ -168,9 +169,30 @@ BucketDBUpdater::removeSuperfluousBuckets(
for (const auto& db_entry : proc.getNonOwnedEntries()) {
readOnlyDb.update(db_entry); // TODO Entry move support
}
+ maybe_inject_simulated_db_pruning_delay();
}
}
+namespace {
+
+void maybe_sleep_for(std::chrono::milliseconds ms) {
+ if (ms.count() > 0) {
+ std::this_thread::sleep_for(ms);
+ }
+}
+
+}
+
+void
+BucketDBUpdater::maybe_inject_simulated_db_pruning_delay() {
+ maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_pruning_latency());
+}
+
+void
+BucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
+ maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_merging_latency());
+}
+
void
BucketDBUpdater::ensureTransitionTimerStarted()
{
@@ -603,6 +625,7 @@ BucketDBUpdater::activatePendingClusterState()
framework::MilliSecTimer process_timer(_distributorComponent.getClock());
_pendingClusterState->mergeIntoBucketDatabases();
+ maybe_inject_simulated_db_merging_delay();
if (_pendingClusterState->isVersionedTransition()) {
LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 663ac488ef4..e69d328d8bc 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -179,6 +179,9 @@ private:
void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
void sendAllQueuedBucketRechecks();
+ void maybe_inject_simulated_db_pruning_delay();
+ void maybe_inject_simulated_db_merging_delay();
+
friend class BucketDBUpdater_Test;
friend class MergeOperation_Test;