summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-09-17 12:12:22 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-09-17 12:14:11 +0000
commit39bc6bf9f2531bb22ebbfc5be3cb09082f02ea42 (patch)
treec596ef4e0cf55e8eb56bca2d845edfcec0f33aad /storage
parentfb4b666e78d936d0f6aadcc4f74cf208e8056776 (diff)
Add config overrides for simulating processing of large bucket sets
Allows injecting artificial thread delays during bucket DB pruning and merging on the distributor. By default, of course, will not inject any delays at all.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp6
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp4
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h9
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def8
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp23
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h3
6 files changed, 50 insertions, 3 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 0def395aff0..19012cfab18 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -64,7 +64,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri,
BucketManager::~BucketManager()
{
- if (_thread.get() != 0) {
+ if (_thread) {
LOG(error, "BucketManager deleted without calling close() first");
onClose();
}
@@ -75,9 +75,9 @@ BucketManager::~BucketManager()
void BucketManager::onClose()
{
// Stop internal thread such that we don't send any more messages down.
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interruptAndJoin(_workerLock, _workerCond);
- _thread.reset(0);
+ _thread.reset();
}
StorageLinkQueued::onClose();
}
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index d2dd74693c8..341f0f39d8f 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -31,6 +31,8 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_minBucketsPerVisitor(5),
_maxClusterClockSkew(0),
_inhibitMergeSendingOnBusyNodeDuration(std::chrono::seconds(60)),
+ _simulated_db_pruning_latency(0),
+ _simulated_db_merging_latency(0),
_doInlineSplit(true),
_enableJoinForSiblingLessBuckets(false),
_enableInconsistentJoin(false),
@@ -159,6 +161,8 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
if (config.inhibitMergeSendingOnBusyNodeDurationSec >= 0) {
_inhibitMergeSendingOnBusyNodeDuration = std::chrono::seconds(config.inhibitMergeSendingOnBusyNodeDurationSec);
}
+ _simulated_db_pruning_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbPruningLatencyMsec));
+ _simulated_db_merging_latency = std::chrono::milliseconds(std::max(0, config.simulatedDbMergingLatencyMsec));
LOG(debug,
"Distributor now using new configuration parameters. Split limits: %d docs/%d bytes. "
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index a58767164a7..28a219dc3f6 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -229,6 +229,13 @@ public:
return _inhibitMergeSendingOnBusyNodeDuration;
}
+ std::chrono::milliseconds simulated_db_pruning_latency() const noexcept {
+ return _simulated_db_pruning_latency;
+ }
+ std::chrono::milliseconds simulated_db_merging_latency() const noexcept {
+ return _simulated_db_merging_latency;
+ }
+
bool getSequenceMutatingOperations() const noexcept {
return _sequenceMutatingOperations;
}
@@ -276,6 +283,8 @@ private:
MaintenancePriorities _maintenancePriorities;
std::chrono::seconds _maxClusterClockSkew;
std::chrono::seconds _inhibitMergeSendingOnBusyNodeDuration;
+ std::chrono::milliseconds _simulated_db_pruning_latency;
+ std::chrono::milliseconds _simulated_db_merging_latency;
bool _doInlineSplit;
bool _enableJoinForSiblingLessBuckets;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index d4f69073cc6..182aa2008c5 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -191,3 +191,11 @@ inhibit_merge_sending_on_busy_node_duration_sec int default=10
## For this option to take effect, the cluster controller must also have two-phase
## states enabled.
allow_stale_reads_during_cluster_state_transitions bool default=false
+
+## If greater than zero, injects a thread sleep into certain parts of the bucket
+## processing logic. This allows for easier testing of racing edge cases where the
+## main distributor thread is CPU-blocked processing large amounts of buckets, but
+## without actually needing to use a lot of buckets in the test itself.
+## Setting any of these values only makes sense for testing!
+simulated_db_pruning_latency_msec int default=0
+simulated_db_merging_latency_msec int default=0
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index c4c8dc439be..a901ac28a54 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -11,6 +11,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/vespalib/util/xmlstream.h>
+#include <thread>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.bucketdb.updater");
@@ -168,9 +169,30 @@ BucketDBUpdater::removeSuperfluousBuckets(
for (const auto& db_entry : proc.getNonOwnedEntries()) {
readOnlyDb.update(db_entry); // TODO Entry move support
}
+ maybe_inject_simulated_db_pruning_delay();
}
}
+namespace {
+
+void maybe_sleep_for(std::chrono::milliseconds ms) {
+ if (ms.count() > 0) {
+ std::this_thread::sleep_for(ms);
+ }
+}
+
+}
+
+void
+BucketDBUpdater::maybe_inject_simulated_db_pruning_delay() {
+ maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_pruning_latency());
+}
+
+void
+BucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
+ maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_merging_latency());
+}
+
void
BucketDBUpdater::ensureTransitionTimerStarted()
{
@@ -603,6 +625,7 @@ BucketDBUpdater::activatePendingClusterState()
framework::MilliSecTimer process_timer(_distributorComponent.getClock());
_pendingClusterState->mergeIntoBucketDatabases();
+ maybe_inject_simulated_db_merging_delay();
if (_pendingClusterState->isVersionedTransition()) {
LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 663ac488ef4..e69d328d8bc 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -179,6 +179,9 @@ private:
void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
void sendAllQueuedBucketRechecks();
+ void maybe_inject_simulated_db_pruning_delay();
+ void maybe_inject_simulated_db_merging_delay();
+
friend class BucketDBUpdater_Test;
friend class MergeOperation_Test;