summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-02-16 15:50:02 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-02-16 15:50:02 +0000
commit2fc34559f65c88fafe8b3c53ea08ba7b4d56c3ff (patch)
tree398a6cf2e287bf84e0d9ac934b5fa5bf2b960fd2 /staging_vespalib
parent1d37355c40b55874eb5f7a0c2e32afe9e9d2504f (diff)
Make Runnable state polling thread safe
Used by Runnable sub-class(es) run-loops, which happens outside its mutex.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp45
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/document_runnable.h29
2 files changed, 45 insertions, 29 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
index 54d90a0f310..d7534514f41 100644
--- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
@@ -15,28 +15,47 @@ Runnable::Runnable()
Runnable::~Runnable() {
std::lock_guard monitorGuard(_stateLock);
- assert(_state == NOT_RUNNING);
+ assert(getState() == NOT_RUNNING);
}
bool Runnable::start(FastOS_ThreadPool& pool)
{
std::unique_lock guard(_stateLock);
- _stateCond.wait(guard, [&](){ return (_state != STOPPING);});
+ _stateCond.wait(guard, [&](){ return (getState() != STOPPING);});
- if (_state != NOT_RUNNING) return false;
- _state = STARTING;
+ if (getState() != NOT_RUNNING) return false;
+ set_state(STARTING);
if (pool.NewThread(this) == nullptr) {
throw vespalib::IllegalStateException("Failed starting a new thread", VESPA_STRLOC);
}
return true;
}
+void Runnable::set_state(State new_state) noexcept
+{
+ _state.store(new_state, std::memory_order_relaxed);
+}
+
+bool Runnable::stopping() const noexcept
+{
+ State s(getState());
+ return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag());
+}
+
+bool Runnable::running() const noexcept
+{
+ State s(getState());
+ // Must check break-flag too, as threadpool will use that to close
+ // down.
+ return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag()));
+}
+
bool Runnable::stop()
{
std::lock_guard monitor(_stateLock);
- if (_state == STOPPING || _state == NOT_RUNNING) return false;
+ if (getState() == STOPPING || getState() == NOT_RUNNING) return false;
GetThread()->SetBreakFlag();
- _state = STOPPING;
+ set_state(STOPPING);
return onStop();
}
@@ -48,8 +67,8 @@ bool Runnable::onStop()
bool Runnable::join() const
{
std::unique_lock guard(_stateLock);
- assert ((_state != STARTING) && (_state != RUNNING));
- _stateCond.wait(guard, [&](){ return (_state == NOT_RUNNING);});
+ assert ((getState() != STARTING) && (getState() != RUNNING));
+ _stateCond.wait(guard, [&](){ return (getState() == NOT_RUNNING);});
return true;
}
@@ -57,21 +76,21 @@ void Runnable::Run(FastOS_ThreadInterface*, void*)
{
{
std::lock_guard guard(_stateLock);
- // Dont set state if its alreadyt at stopping. (And let run() be
+ // Don't set state if its already at stopping. (And let run() be
// called even though about to stop for consistency)
- if (_state == STARTING) {
- _state = RUNNING;
+ if (getState() == STARTING) {
+ set_state(RUNNING);
}
}
// By not catching exceptions, they should abort whole application.
- // We should thus not need to have a catch all to set state to not
+ // We should thus not need to have a catch-all to set state to not
// running.
run();
{
std::lock_guard guard(_stateLock);
- _state = NOT_RUNNING;
+ set_state(NOT_RUNNING);
_stateCond.notify_all();
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
index cf2befcc8d5..5ca344ea7ef 100644
--- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
+++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
@@ -19,6 +19,7 @@
#pragma once
#include <vespa/fastos/thread.h>
+#include <atomic>
namespace document {
@@ -29,16 +30,17 @@ public:
private:
mutable std::mutex _stateLock;
mutable std::condition_variable _stateCond;
- State _state;
+ std::atomic<State> _state;
void Run(FastOS_ThreadInterface*, void*) override;
+ void set_state(State new_state) noexcept; // _stateLock must be held
public:
/**
* Create a runnable.
* @param pool If set, runnable will be started in constructor.
*/
Runnable();
- ~Runnable();
+ ~Runnable() override;
/**
* Start this runnable.
@@ -71,26 +73,21 @@ public:
*/
virtual void run() = 0;
- /** Get the current state of this runnable. */
- State getState() const { return _state; }
+ /**
+ * Get the current state of this runnable.
+ * Thread safe (but relaxed) read; may be stale if done outside _stateLock.
+ */
+ [[nodiscard]] State getState() const noexcept {
+ return _state.load(std::memory_order_relaxed);
+ }
/** Check if system is in the process of stopping. */
- bool stopping() const
- {
- State s(getState());
- return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag());
- }
+ [[nodiscard]] bool stopping() const noexcept;
/**
* Checks if runnable is running or not. (Started is considered running)
*/
- bool running() const
- {
- State s(getState());
- // Must check breakflag too, as threadpool will use that to close
- // down.
- return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag()));
- }
+ [[nodiscard]] bool running() const noexcept;
};
}