summaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa/storage/distributor/distributor_stripe_thread.h')
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.h81
1 files changed, 81 insertions, 0 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
new file mode 100644
index 00000000000..b02d733895e
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
@@ -0,0 +1,81 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <vector>
+
+namespace storage::distributor {
+
+class DistributorStripe;
+class DistributorStripePool;
+class TickableStripe;
+
+/**
+ * A DistributorStripeThread provides threading resources for a single distributor stripe
+ * and the means of synchronizing access towards it through a DistributorStripePool.
+ *
+ * A DistributorStripeThread instance is bidirectionally bound to a particular pool and
+ * should therefore always be created by the pool itself (never standalone).
+ */
+class DistributorStripeThread : private FastOS_Runnable {
+ using AtomicDuration = std::atomic<vespalib::duration>;
+
+ TickableStripe& _stripe;
+ DistributorStripePool& _stripe_pool;
+ AtomicDuration _tick_wait_duration;
+ std::mutex _mutex;
+ std::condition_variable _event_cond;
+ std::condition_variable _park_cond;
+ std::atomic<uint32_t> _ticks_before_wait;
+ std::atomic<bool> _should_park;
+ std::atomic<bool> _should_stop;
+ bool _waiting_for_event;
+
+ friend class DistributorStripePool;
+public:
+ DistributorStripeThread(TickableStripe& stripe,
+ DistributorStripePool& stripe_pool);
+ ~DistributorStripeThread();
+
+ void Run(FastOS_ThreadInterface*, void*) override;
+
+ // Wakes up stripe thread if it's currently waiting for an external event to be triggered,
+ // such as the arrival of a new RPC message. If thread is parked this call will have no
+ // effect.
+ void notify_event_has_triggered() noexcept;
+
+ void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
+ void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept;
+
+ TickableStripe* operator->() noexcept { return &_stripe; }
+ const TickableStripe* operator->() const noexcept { return &_stripe; }
+private:
+ [[nodiscard]] bool should_stop_thread_relaxed() const noexcept {
+ return _should_stop.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] bool should_park_relaxed() const noexcept {
+ return _should_park.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] vespalib::duration tick_wait_duration_relaxed() const noexcept {
+ return _tick_wait_duration.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] uint32_t ticks_before_wait_relaxed() const noexcept {
+ return _ticks_before_wait.load(std::memory_order_relaxed);
+ }
+
+ void signal_wants_park() noexcept;
+ void unpark_thread() noexcept;
+ void wait_until_event_notified_or_timed_out() noexcept;
+ void wait_until_unparked() noexcept;
+
+ void signal_should_stop() noexcept;
+};
+
+}