summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-02-21 17:50:17 +0100
committerGitHub <noreply@github.com>2022-02-21 17:50:17 +0100
commita7e8bb9dcf3c674a3756e0f0383384593856415a (patch)
tree3944389e6b3d0e5b0ef7992808a3ca1ff24ff260
parentf67ad6e4bdb5cf4b834428c61bc18953d9efd761 (diff)
parenteddc91fb205d4bc8e68aa72be86ed39a199728b5 (diff)
Merge pull request #21285 from vespa-engine/vekterli/more-threading-fixes
More miscellaneous threading fixes [run-systemtest]
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp17
-rw-r--r--fnet/src/vespa/fnet/connection.cpp20
-rw-r--r--fnet/src/vespa/fnet/connection.h6
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp10
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp21
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.cpp19
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.h10
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp19
-rw-r--r--storageframework/src/tests/thread/tickingthreadtest.cpp56
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp11
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.h7
15 files changed, 127 insertions, 87 deletions
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index e3c238cf633..17c38ab6e3a 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -5,15 +5,16 @@
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fnet/frt/invoker.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <atomic>
#include <thread>
struct Receptor : public FRT_IRequestWait
{
- FRT_RPCRequest *req;
+ std::atomic<FRT_RPCRequest*> req;
- Receptor() : req(0) {}
+ Receptor() : req(nullptr) {}
void RequestDone(FRT_RPCRequest *r) override {
- req = r;
+ req.store(r);
}
};
@@ -55,18 +56,18 @@ TEST("detach return invoke") {
target->InvokeSync(req, 5.0);
EXPECT_TRUE(!req->IsError());
for (uint32_t i = 0; i < 1000; ++i) {
- if (receptor.req != 0) {
+ if (receptor.req.load() != nullptr) {
break;
}
std::this_thread::sleep_for(10ms);
}
req->SubRef();
target->SubRef();
- if (receptor.req != 0) {
- EXPECT_TRUE(!receptor.req->IsError());
- receptor.req->SubRef();
+ if (receptor.req.load() != nullptr) {
+ EXPECT_TRUE(!receptor.req.load()->IsError());
+ receptor.req.load()->SubRef();
}
- EXPECT_TRUE(receptor.req != 0);
+ EXPECT_TRUE(receptor.req.load() != nullptr);
};
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index 83224178e15..e3259db848b 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -125,8 +125,8 @@ FNET_Connection::SetState(State state)
std::vector<FNET_Channel::UP> toDelete;
std::unique_lock<std::mutex> guard(_ioc_lock);
- oldstate = _state;
- _state = state;
+ oldstate = GetState();
+ _state.store(state, std::memory_order_relaxed);
if (LOG_WOULD_LOG(debug) && state != oldstate) {
LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
GetStateString(oldstate), GetStateString(state));
@@ -570,7 +570,7 @@ FNET_Connection::Init()
}
// handle close by admin channel init
- if (_state == FNET_CLOSED) {
+ if (GetState() == FNET_CLOSED) {
return false;
}
@@ -600,7 +600,7 @@ FNET_Connection::handle_handshake_act()
{
assert(_flags._handshake_work_pending);
_flags._handshake_work_pending = false;
- return ((_state == FNET_CONNECTING) && handshake());
+ return ((GetState() == FNET_CONNECTING) && handshake());
}
void
@@ -619,7 +619,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
FNET_Channel * ret = nullptr;
std::unique_lock<std::mutex> guard(_ioc_lock);
- if (__builtin_expect(_state < FNET_CLOSING, true)) {
+ if (__builtin_expect(GetState() < FNET_CLOSING, true)) {
newChannel->SetID(GetNextID());
if (chid != nullptr) {
*chid = newChannel->GetID();
@@ -698,7 +698,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
assert(packet != nullptr);
std::unique_lock<std::mutex> guard(_ioc_lock);
- if (_state >= FNET_CLOSING) {
+ if (GetState() >= FNET_CLOSING) {
if (_flags._discarding) {
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
} else {
@@ -710,7 +710,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
writeWork = _writeWork;
_writeWork++;
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
- if ((writeWork == 0) && (_state == FNET_CONNECTED)) {
+ if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) {
AddRef_NoLock();
guard.unlock();
Owner()->EnableWrite(this, /* needRef = */ false);
@@ -756,7 +756,7 @@ FNET_Connection::HandleReadEvent()
{
bool broken = false; // is connection broken ?
- switch(_state) {
+ switch(GetState()) {
case FNET_CONNECTING:
broken = !handshake();
break;
@@ -776,7 +776,7 @@ bool
FNET_Connection::writePendingAfterConnect()
{
std::lock_guard<std::mutex> guard(_ioc_lock);
- _state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
+ _state.store(FNET_CONNECTED, std::memory_order_relaxed); // SetState(FNET_CONNECTED)
LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
return (_writeWork > 0);
@@ -787,7 +787,7 @@ FNET_Connection::HandleWriteEvent()
{
bool broken = false; // is connection broken ?
- switch(_state) {
+ switch(GetState()) {
case FNET_CONNECTING:
broken = !handshake();
break;
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index e86b670b7e5..af4483e23e5 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -100,7 +100,7 @@ private:
vespalib::CryptoSocket::UP _socket; // socket for this conn
ResolveHandlerSP _resolve_handler; // async resolve callback
FNET_Context _context; // connection context
- State _state; // connection state
+ std::atomic<State> _state; // connection state. May be polled outside lock
Flags _flags; // Packed flags.
uint32_t _packetLength; // packet length
uint32_t _packetCode; // packet code
@@ -339,9 +339,9 @@ public:
/**
- * @return current connection state.
+ * @return current connection state. May be stale if read outside lock.
**/
- State GetState() { return _state; }
+ State GetState() const noexcept { return _state.load(std::memory_order_relaxed); }
/**
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 53f8fea40cd..1d55ac3b327 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -236,7 +236,7 @@ FNET_TransportThread::~FNET_TransportThread()
{
std::lock_guard<std::mutex> guard(_shutdownLock);
}
- if (_started.load() && !_finished) {
+ if (_started.load() && !is_finished()) {
LOG(error, "Transport: delete called on active object!");
} else {
std::lock_guard guard(_pseudo_thread);
@@ -380,11 +380,11 @@ FNET_TransportThread::ShutDown(bool waitFinished)
void
FNET_TransportThread::WaitFinished()
{
- if (_finished)
+ if (is_finished())
return;
std::unique_lock<std::mutex> guard(_shutdownLock);
- while (!_finished)
+ while (!is_finished())
_shutdownCond.wait(guard);
}
@@ -501,7 +501,7 @@ FNET_TransportThread::EventLoopIteration() {
if (!IsShutDown())
return true;
- if (_finished)
+ if (is_finished())
return false;
endEventLoop();
@@ -557,7 +557,7 @@ FNET_TransportThread::endEventLoop() {
{
std::lock_guard<std::mutex> guard(_shutdownLock);
- _finished = true;
+ _finished.store(true, std::memory_order_relaxed);
_shutdownCond.notify_all();
}
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index cf1f68a8b39..dfdcf4e1970 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -50,7 +50,7 @@ private:
std::recursive_mutex _pseudo_thread; // used after transport thread has shut down
std::atomic<bool> _started; // event loop started ?
std::atomic<bool> _shutdown; // should stop event loop ?
- bool _finished; // event loop stopped ?
+ std::atomic<bool> _finished; // event loop stopped ?
/**
* Add an IOComponent to the list of components. This operation is
@@ -173,6 +173,10 @@ private:
return _shutdown.load(std::memory_order_relaxed);
}
+ [[nodiscard]] bool is_finished() const noexcept {
+ return _finished.load(std::memory_order_relaxed);
+ }
+
public:
FNET_TransportThread(const FNET_TransportThread &) = delete;
FNET_TransportThread &operator=(const FNET_TransportThread &) = delete;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index ed2ce3d638e..c33f918a39c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -423,6 +423,8 @@ RPCNetwork::sync()
void
RPCNetwork::shutdown()
{
+ // Unschedule any pending target pool flush task that may race with shutdown target flushing
+ _scheduler.Kill(_targetPoolTask.get());
_transport->ShutDown(true);
_threadPool->Close();
_executor->shutdown().sync();
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index 44e6890415a..b403c65f863 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -54,14 +54,21 @@ void
RPCTargetPool::flushTargets(bool force)
{
uint64_t currentTime = _timer->getMilliTime();
+ // Erase RPC targets outside our lock to prevent the following mutex order inversion potential:
+ // flushTargets (pool lock) -> FNET transport thread post event (transport thread lock)
+ // FNET CheckTasks (transport thread lock) -> periodic flushTargets task run -> flushTargets (pool lock)
+ std::vector<Entry> to_erase_on_scope_exit;
LockGuard guard(_lock);
- TargetMap::iterator it = _targets.begin();
- while (it != _targets.end()) {
- const Entry &entry = it->second;
- if ( ! entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) {
- _targets.erase(it++); // postfix increment to move the iterator
- } else {
- ++it;
+ {
+ auto it = _targets.begin();
+ while (it != _targets.end()) {
+ const Entry& entry = it->second;
+ if (!entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) {
+ to_erase_on_scope_exit.emplace_back(std::move(it->second));
+ it = _targets.erase(it);
+ } else {
+ ++it;
+ }
}
}
}
diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp
index eb959dc17b4..80e5925f1a6 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.cpp
+++ b/messagebus/src/vespa/messagebus/routing/resender.cpp
@@ -10,9 +10,10 @@ using namespace std::chrono;
namespace mbus {
-Resender::Resender(IRetryPolicy::SP retryPolicy) :
- _queue(),
- _retryPolicy(retryPolicy)
+Resender::Resender(IRetryPolicy::SP retryPolicy)
+ : _queue_mutex(),
+ _queue(),
+ _retryPolicy(retryPolicy)
{ }
Resender::~Resender()
@@ -26,13 +27,16 @@ Resender::~Resender()
void
Resender::resendScheduled()
{
- typedef std::vector<RoutingNode*> NodeList;
+ using NodeList = std::vector<RoutingNode*>;
NodeList sendList;
time_point now = steady_clock::now();
- while (!_queue.empty() && _queue.top().first <= now) {
- sendList.push_back(_queue.top().second);
- _queue.pop();
+ {
+ std::lock_guard guard(_queue_mutex);
+ while (!_queue.empty() && _queue.top().first <= now) {
+ sendList.push_back(_queue.top().second);
+ _queue.pop();
+ }
}
for (RoutingNode *node : sendList) {
@@ -84,6 +88,7 @@ Resender::scheduleRetry(RoutingNode &node)
TraceLevel::COMPONENT,
vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay));
msg.setRetry(retry);
+ std::lock_guard guard(_queue_mutex);
_queue.push(Entry(steady_clock::now() + delayMS, &node));
return true;
}
diff --git a/messagebus/src/vespa/messagebus/routing/resender.h b/messagebus/src/vespa/messagebus/routing/resender.h
index 599ac789cab..fbce5c7fe8e 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.h
+++ b/messagebus/src/vespa/messagebus/routing/resender.h
@@ -4,6 +4,7 @@
#include "iretrypolicy.h"
#include <vespa/messagebus/queue.h>
#include <vespa/messagebus/reply.h>
+#include <mutex>
#include <queue>
#include <vector>
@@ -30,6 +31,7 @@ private:
};
using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>;
+ std::mutex _queue_mutex;
PriorityQueue _queue;
IRetryPolicy::SP _retryPolicy;
public:
@@ -45,7 +47,7 @@ public:
*
* @param retryPolicy The retry policy to use.
*/
- Resender(IRetryPolicy::SP retryPolicy);
+ explicit Resender(IRetryPolicy::SP retryPolicy);
/**
* Empties the retry queue.
@@ -59,7 +61,7 @@ public:
* @param errorCode The code to check.
* @return True if the message can be resent.
*/
- bool canRetry(uint32_t errorCode) const;
+ [[nodiscard]] bool canRetry(uint32_t errorCode) const;
/**
* Returns whether or not the given reply should be retried.
@@ -67,7 +69,7 @@ public:
* @param reply The reply to check.
* @return True if retry is required.
*/
- bool shouldRetry(const Reply &reply) const;
+ [[nodiscard]] bool shouldRetry(const Reply &reply) const;
/**
* Schedules the given node for resending, if enabled by message. This will
@@ -78,7 +80,7 @@ public:
* @param node The node to resend.
* @return True if the node was queued.
*/
- bool scheduleRetry(RoutingNode &node);
+ [[nodiscard]] bool scheduleRetry(RoutingNode &node);
/**
* Invokes {@link RoutingNode#send()} on all routing nodes that are
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index 3bff3649634..7e4980277e7 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -118,7 +118,7 @@ public:
search::SerialNum _currentSerial;
uint32_t _pendingDone;
uint32_t _taskDone;
- std::mutex _lock;
+ mutable std::mutex _lock;
vespalib::CountDownLatch _done;
FlushDoneHistory _flushDoneHistory;
@@ -146,7 +146,7 @@ public:
std::vector<IFlushTarget::SP>
getFlushTargets() override {
{
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
_pendingDone += _taskDone;
_taskDone = 0;
}
@@ -160,7 +160,7 @@ public:
// Called once by flush engine thread for each task done
void taskDone() {
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
++_taskDone;
}
@@ -168,7 +168,7 @@ public:
// added to flush engine and when one or more flush tasks related
// to flush handler have completed.
void flushDone(search::SerialNum oldestSerial) override {
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", getName().c_str(), oldestSerial);
_oldestSerial = std::max(_oldestSerial, oldestSerial);
_flushDoneHistory.push_back(oldestSerial);
@@ -179,9 +179,14 @@ public:
}
FlushDoneHistory getFlushDoneHistory() {
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
return _flushDoneHistory;
}
+
+ [[nodiscard]] search::SerialNum oldest_serial() const noexcept {
+ std::lock_guard guard(_lock);
+ return _oldestSerial;
+ }
};
void WrappedFlushTask::run()
@@ -440,11 +445,11 @@ struct Fixture
using namespace std::chrono_literals;
for (int pass = 0; pass < 600; ++pass) {
std::this_thread::sleep_for(100ms);
- if (handler._oldestSerial == expOldestSerial) {
+ if (handler.oldest_serial() == expOldestSerial) {
break;
}
}
- EXPECT_EQUAL(expOldestSerial, handler._oldestSerial);
+ EXPECT_EQUAL(expOldestSerial, handler.oldest_serial());
}
};
diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp
index 5fe8f25ae72..577e9d128b9 100644
--- a/storageframework/src/tests/thread/tickingthreadtest.cpp
+++ b/storageframework/src/tests/thread/tickingthreadtest.cpp
@@ -5,24 +5,31 @@
#include <vespa/storageframework/generic/thread/tickingthread.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/atomic.h>
#include <thread>
+using namespace vespalib::atomic;
+
namespace storage::framework::defaultimplementation {
namespace {
struct Context {
- uint64_t _critTickCount;
- uint64_t _nonCritTickCount;
+ std::atomic<uint64_t> _critTickCount;
+ std::atomic<uint64_t> _nonCritTickCount;
- Context() : _critTickCount(0), _nonCritTickCount(0) {}
+ constexpr Context() noexcept : _critTickCount(0), _nonCritTickCount(0) {}
+ Context(const Context& rhs) noexcept
+ : _critTickCount(load_relaxed(rhs._critTickCount)),
+ _nonCritTickCount(load_relaxed(rhs._nonCritTickCount))
+ {}
};
struct MyApp : public TickingThread {
- uint32_t _critOverlapCounter;
- bool _doCritOverlapTest;
- bool _critOverlap;
- std::vector<Context> _context;
+ std::atomic<uint32_t> _critOverlapCounter;
+ std::atomic<bool> _critOverlap;
+ bool _doCritOverlapTest;
+ std::vector<Context> _context;
TickingThreadPool::UP _threadPool;
MyApp(int threadCount, bool doCritOverlapTest = false);
@@ -34,62 +41,63 @@ struct MyApp : public TickingThread {
assert(index < _context.size());
Context& c(_context[index]);
if (_doCritOverlapTest) {
- uint32_t oldTick = _critOverlapCounter;
+ uint32_t oldTick = load_relaxed(_critOverlapCounter);
std::this_thread::sleep_for(1ms);
- _critOverlap |= (_critOverlapCounter != oldTick);
- ++_critOverlapCounter;
+ store_relaxed(_critOverlap, load_relaxed(_critOverlap) || (load_relaxed(_critOverlapCounter) != oldTick));
+ _critOverlapCounter.fetch_add(1, std::memory_order_relaxed);
}
- ++c._critTickCount;
+ c._critTickCount.fetch_add(1, std::memory_order_relaxed);
return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
}
ThreadWaitInfo doNonCriticalTick(ThreadIndex index) override {
assert(index < _context.size());
Context& c(_context[index]);
- ++c._nonCritTickCount;
+ c._nonCritTickCount.fetch_add(1, std::memory_order_relaxed);
return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
}
uint64_t getMinCritTick() {
uint64_t min = std::numeric_limits<uint64_t>().max();
for (uint32_t i=0; i<_context.size(); ++i) {
- min = std::min(min, _context[i]._critTickCount);
+ min = std::min(min, load_relaxed(_context[i]._critTickCount));
}
return min;
}
uint64_t getMinNonCritTick() {
uint64_t min = std::numeric_limits<uint64_t>().max();
for (uint32_t i=0; i<_context.size(); ++i) {
- min = std::min(min, _context[i]._critTickCount);
+ min = std::min(min, load_relaxed(_context[i]._critTickCount));
}
return min;
}
- uint64_t getTotalCritTicks() {
+ uint64_t getTotalCritTicks() const noexcept {
uint64_t total = 0;
for (uint32_t i=0; i<_context.size(); ++i) {
- total += _context[i]._critTickCount;
+ total += load_relaxed(_context[i]._critTickCount);
}
return total;
}
- uint64_t getTotalNonCritTicks() {
+ uint64_t getTotalNonCritTicks() const noexcept {
uint64_t total = 0;
for (uint32_t i=0; i<_context.size(); ++i) {
- total += _context[i]._nonCritTickCount;
+ total += load_relaxed(_context[i]._nonCritTickCount);
}
return total;
}
- uint64_t getTotalTicks()
- { return getTotalCritTicks() + getTotalNonCritTicks(); }
- bool hasCritOverlap() { return _critOverlap; }
+ uint64_t getTotalTicks() const noexcept {
+ return getTotalCritTicks() + getTotalNonCritTicks();
+ }
+ bool hasCritOverlap() const noexcept { return load_relaxed(_critOverlap); }
};
MyApp::MyApp(int threadCount, bool doCritOverlapTest)
: _critOverlapCounter(0),
- _doCritOverlapTest(doCritOverlapTest),
_critOverlap(false),
+ _doCritOverlapTest(doCritOverlapTest),
_threadPool(TickingThreadPool::createDefault("testApp", 100ms))
{
for (int i=0; i<threadCount; ++i) {
_threadPool->addThread(*this);
- _context.push_back(Context());
+ _context.emplace_back();
}
}
@@ -182,7 +190,7 @@ TEST(TickingThreadTest, test_lock_critical_ticks)
app.start(testReg.getThreadPoolImpl());
while (!app.hasCritOverlap()) {
std::this_thread::sleep_for(1ms);
- ++app._critOverlapCounter;
+ app._critOverlapCounter.fetch_add(1, std::memory_order_relaxed);
++iterationsBeforeOverlap;
}
}
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
index 7281c55ba25..b8ef8e4610b 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
@@ -3,10 +3,13 @@
#include "threadimpl.h"
#include "threadpoolimpl.h"
#include <vespa/storageframework/generic/clock/clock.h>
+#include <vespa/vespalib/util/atomic.h>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".framework.thread.impl");
+using namespace vespalib::atomic;
+
namespace storage::framework::defaultimplementation {
ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
@@ -27,7 +30,7 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
_thread(*this),
_cpu_category(cpu_category)
{
- _tickData[_tickDataPtr]._lastTick = pool.getClock().getMonotonicTime();
+ _tickData[load_relaxed(_tickDataPtr)]._lastTick = pool.getClock().getMonotonicTime();
_thread.start(_pool.getThreadPool());
}
@@ -105,15 +108,15 @@ ThreadImpl::registerTick(CycleType cycleType, vespalib::steady_time now)
ThreadTickData
ThreadImpl::getTickData() const
{
- return _tickData[_tickDataPtr].loadRelaxed();
+ return _tickData[load_acquire(_tickDataPtr)].loadRelaxed();
}
void
ThreadImpl::setTickData(const ThreadTickData& tickData)
{
- uint32_t nextData = (_tickDataPtr + 1) % _tickData.size();
+ uint32_t nextData = (load_relaxed(_tickDataPtr) + 1) % _tickData.size();
_tickData[nextData].storeRelaxed(tickData);
- _tickDataPtr = nextData;
+ store_release(_tickDataPtr, nextData);
}
ThreadTickData
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
index e6f0a21ea20..46a9412bf67 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
@@ -49,7 +49,7 @@ class ThreadImpl : public Thread
Runnable& _runnable;
ThreadProperties _properties;
std::array<AtomicThreadTickData, 3> _tickData;
- uint32_t _tickDataPtr;
+ std::atomic<uint32_t> _tickDataPtr;
std::atomic<bool> _interrupted;
bool _joined;
BackendThread _thread;
diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp
index c3230bf313d..ffa9f385967 100644
--- a/vespalib/src/vespa/vespalib/util/thread.cpp
+++ b/vespalib/src/vespa/vespalib/util/thread.cpp
@@ -60,7 +60,7 @@ Thread &
Thread::stop()
{
std::unique_lock guard(_lock);
- _stopped = true;
+ _stopped.store(true, std::memory_order_relaxed);
_cond.notify_all();
return *this;
}
@@ -75,14 +75,14 @@ bool
Thread::slumber(double s)
{
std::unique_lock guard(_lock);
- if (!_stopped || _woken) {
+ if (!stopped() || _woken) {
if (_cond.wait_for(guard, from_s(s)) == std::cv_status::no_timeout) {
- _woken = _stopped;
+ _woken = stopped();
}
} else {
_woken = true;
}
- return !_stopped;
+ return !stopped();
}
Thread &
diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h
index e08f3ca1100..0c7693556c7 100644
--- a/vespalib/src/vespa/vespalib/util/thread.h
+++ b/vespalib/src/vespa/vespalib/util/thread.h
@@ -6,6 +6,7 @@
#include "runnable.h"
#include "active.h"
#include <vespa/fastos/thread.h>
+#include <atomic>
namespace vespalib {
@@ -37,7 +38,7 @@ private:
FastOS_ThreadPool _pool;
std::mutex _lock;
std::condition_variable _cond;
- bool _stopped;
+ std::atomic<bool> _stopped;
bool _woken;
public:
@@ -46,7 +47,9 @@ public:
void start() override;
Thread &stop() override;
void join() override;
- bool stopped() const { return _stopped; }
+ [[nodiscard]] bool stopped() const noexcept {
+ return _stopped.load(std::memory_order_relaxed);
+ }
bool slumber(double s);
static Thread &currentThread();
static void sleep(size_t ms);