summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2022-01-21 16:06:21 +0000
committerHåvard Pettersen <havardpe@oath.com>2022-01-21 16:17:54 +0000
commit980c43eb4464bf11c48e996a2b0a88a8ba32da46 (patch)
treebf8821ee371b4f82ecd032ba1c3a3b7d7c0018bf
parent4ea7718e942e69fd6b0a52fe373dc3493a389a16 (diff)
explicitly wait for conflicts to happen
-rw-r--r--vespalib/src/tests/cpu_usage/cpu_usage_test.cpp26
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.cpp1
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.h4
3 files changed, 28 insertions, 3 deletions
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
index b2da9db857c..6bedfb5013e 100644
--- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
+++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
@@ -225,6 +225,17 @@ struct CpuUsage::Test {
Guard guard(my_usage._lock);
return my_usage._threads.size();
}
+ bool is_sampling() {
+ Guard guard(my_usage._lock);
+ return my_usage._sampling;
+ }
+ size_t count_conflicts() {
+ Guard guard(my_usage._lock);
+ if (!my_usage._conflict) {
+ return 0;
+ }
+ return my_usage._conflict->waiters;
+ }
size_t count_simple_samples() {
size_t result = 0;
for (const auto &simple: simple_list) {
@@ -282,7 +293,7 @@ TEST_F("require that threads added and removed between CpuUsage sample calls are
EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
}
-TEST_MT_FF("require that sample conflicts are resolved correctly", 4, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(3)) {
+TEST_MT_FF("require that sample conflicts are resolved correctly", 5, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(num_threads - 1)) {
if (thread_id == 0) {
CpuUsage::Sample s1;
s1[CpuUsage::Category::SETUP] = 10ms;
@@ -294,19 +305,30 @@ TEST_MT_FF("require that sample conflicts are resolved correctly", 4, CpuUsage::
s4[CpuUsage::Category::COMPACT] = 40ms;
f1.add_blocking();
f1.add_simple(s1); // should be sampled
+ EXPECT_TRUE(!f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), 0u);
TEST_BARRIER(); // #1
f1.blocking->sync_entry();
+ EXPECT_TRUE(f1.is_sampling());
+ while (f1.count_conflicts() < (num_threads - 2)) {
+ // wait for appropriate number of conflicts
+ std::this_thread::sleep_for(1ms);
+ }
f1.add_simple(s2); // should NOT be sampled (pending add)
f1.add_remove_simple(s3); // should be sampled (pending remove);
EXPECT_EQUAL(f1.count_threads(), 2u);
+ EXPECT_TRUE(f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), (num_threads - 2));
f1.blocking->swap_sample(s4);
TEST_BARRIER(); // #2
+ EXPECT_TRUE(!f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), 0u);
EXPECT_EQUAL(f1.count_threads(), 3u);
EXPECT_EQUAL(f2[0].second[CpuUsage::Category::SETUP], duration(10ms));
EXPECT_EQUAL(f2[0].second[CpuUsage::Category::READ], duration(0ms));
EXPECT_EQUAL(f2[0].second[CpuUsage::Category::WRITE], duration(30ms));
EXPECT_EQUAL(f2[0].second[CpuUsage::Category::COMPACT], duration(40ms));
- for (size_t i = 1; i < 3; ++i) {
+ for (size_t i = 1; i < (num_threads - 1); ++i) {
EXPECT_EQUAL(f2[i].first, f2[0].first);
EXPECT_EQUAL(f2[i].second[CpuUsage::Category::SETUP], f2[0].second[CpuUsage::Category::SETUP]);
EXPECT_EQUAL(f2[i].second[CpuUsage::Category::READ], f2[0].second[CpuUsage::Category::READ]);
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
index 5f9e1f521b9..345da66cc39 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
@@ -247,6 +247,7 @@ CpuUsage::sample_or_wait()
_conflict = std::make_unique<SampleConflict>();
}
my_future = _conflict->future_sample;
+ _conflict->waiters++;
} else {
_sampling = true;
}
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h
index 7b44d07a0a3..b0e9f60311b 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.h
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h
@@ -122,8 +122,10 @@ private:
struct SampleConflict {
std::promise<TimedSample> sample_promise;
std::shared_future<TimedSample> future_sample;
+ size_t waiters;
SampleConflict() : sample_promise(),
- future_sample(sample_promise.get_future()) {}
+ future_sample(sample_promise.get_future()),
+ waiters(0) {}
};
// Interface used to perform destructive sampling of the CPU spent