diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-21 17:50:17 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-21 17:50:17 +0100 |
commit | a7e8bb9dcf3c674a3756e0f0383384593856415a (patch) | |
tree | 3944389e6b3d0e5b0ef7992808a3ca1ff24ff260 | |
parent | f67ad6e4bdb5cf4b834428c61bc18953d9efd761 (diff) | |
parent | eddc91fb205d4bc8e68aa72be86ed39a199728b5 (diff) |
Merge pull request #21285 from vespa-engine/vekterli/more-threading-fixes
More miscellaneous threading fixes [run-systemtest]
15 files changed, 127 insertions, 87 deletions
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index e3c238cf633..17c38ab6e3a 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -5,15 +5,16 @@ #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fnet/frt/invoker.h> #include <vespa/vespalib/util/stringfmt.h> +#include <atomic> #include <thread> struct Receptor : public FRT_IRequestWait { - FRT_RPCRequest *req; + std::atomic<FRT_RPCRequest*> req; - Receptor() : req(0) {} + Receptor() : req(nullptr) {} void RequestDone(FRT_RPCRequest *r) override { - req = r; + req.store(r); } }; @@ -55,18 +56,18 @@ TEST("detach return invoke") { target->InvokeSync(req, 5.0); EXPECT_TRUE(!req->IsError()); for (uint32_t i = 0; i < 1000; ++i) { - if (receptor.req != 0) { + if (receptor.req.load() != nullptr) { break; } std::this_thread::sleep_for(10ms); } req->SubRef(); target->SubRef(); - if (receptor.req != 0) { - EXPECT_TRUE(!receptor.req->IsError()); - receptor.req->SubRef(); + if (receptor.req.load() != nullptr) { + EXPECT_TRUE(!receptor.req.load()->IsError()); + receptor.req.load()->SubRef(); } - EXPECT_TRUE(receptor.req != 0); + EXPECT_TRUE(receptor.req.load() != nullptr); }; TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index 83224178e15..e3259db848b 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -125,8 +125,8 @@ FNET_Connection::SetState(State state) std::vector<FNET_Channel::UP> toDelete; std::unique_lock<std::mutex> guard(_ioc_lock); - oldstate = _state; - _state = state; + oldstate = GetState(); + _state.store(state, std::memory_order_relaxed); if (LOG_WOULD_LOG(debug) && state != oldstate) { LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), GetStateString(oldstate), GetStateString(state)); @@ -570,7 +570,7 @@ FNET_Connection::Init() } // handle close by admin channel init - if (_state == FNET_CLOSED) { + if (GetState() == FNET_CLOSED) { return false; } @@ -600,7 +600,7 @@ FNET_Connection::handle_handshake_act() { assert(_flags._handshake_work_pending); _flags._handshake_work_pending = false; - return ((_state == FNET_CONNECTING) && handshake()); + return ((GetState() == FNET_CONNECTING) && handshake()); } void @@ -619,7 +619,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, FNET_Channel * ret = nullptr; std::unique_lock<std::mutex> guard(_ioc_lock); - if (__builtin_expect(_state < FNET_CLOSING, true)) { + if (__builtin_expect(GetState() < FNET_CLOSING, true)) { newChannel->SetID(GetNextID()); if (chid != nullptr) { *chid = newChannel->GetID(); @@ -698,7 +698,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) assert(packet != nullptr); std::unique_lock<std::mutex> guard(_ioc_lock); - if (_state >= FNET_CLOSING) { + if (GetState() >= FNET_CLOSING) { if (_flags._discarding) { _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); } else { @@ -710,7 +710,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) writeWork = _writeWork; _writeWork++; _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); - if ((writeWork == 0) && (_state == FNET_CONNECTED)) { + if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) { AddRef_NoLock(); guard.unlock(); Owner()->EnableWrite(this, /* needRef = */ false); @@ -756,7 +756,7 @@ FNET_Connection::HandleReadEvent() { bool broken = false; // is connection broken ? - switch(_state) { + switch(GetState()) { case FNET_CONNECTING: broken = !handshake(); break; @@ -776,7 +776,7 @@ bool FNET_Connection::writePendingAfterConnect() { std::lock_guard<std::mutex> guard(_ioc_lock); - _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) + _state.store(FNET_CONNECTED, std::memory_order_relaxed); // SetState(FNET_CONNECTED) LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); return (_writeWork > 0); @@ -787,7 +787,7 @@ FNET_Connection::HandleWriteEvent() { bool broken = false; // is connection broken ? - switch(_state) { + switch(GetState()) { case FNET_CONNECTING: broken = !handshake(); break; diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index e86b670b7e5..af4483e23e5 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -100,7 +100,7 @@ private: vespalib::CryptoSocket::UP _socket; // socket for this conn ResolveHandlerSP _resolve_handler; // async resolve callback FNET_Context _context; // connection context - State _state; // connection state + std::atomic<State> _state; // connection state. May be polled outside lock Flags _flags; // Packed flags. uint32_t _packetLength; // packet length uint32_t _packetCode; // packet code @@ -339,9 +339,9 @@ public: /** - * @return current connection state. + * @return current connection state. May be stale if read outside lock. **/ - State GetState() { return _state; } + State GetState() const noexcept { return _state.load(std::memory_order_relaxed); } /** diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 53f8fea40cd..1d55ac3b327 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -236,7 +236,7 @@ FNET_TransportThread::~FNET_TransportThread() { std::lock_guard<std::mutex> guard(_shutdownLock); } - if (_started.load() && !_finished) { + if (_started.load() && !is_finished()) { LOG(error, "Transport: delete called on active object!"); } else { std::lock_guard guard(_pseudo_thread); @@ -380,11 +380,11 @@ FNET_TransportThread::ShutDown(bool waitFinished) void FNET_TransportThread::WaitFinished() { - if (_finished) + if (is_finished()) return; std::unique_lock<std::mutex> guard(_shutdownLock); - while (!_finished) + while (!is_finished()) _shutdownCond.wait(guard); } @@ -501,7 +501,7 @@ FNET_TransportThread::EventLoopIteration() { if (!IsShutDown()) return true; - if (_finished) + if (is_finished()) return false; endEventLoop(); @@ -557,7 +557,7 @@ FNET_TransportThread::endEventLoop() { { std::lock_guard<std::mutex> guard(_shutdownLock); - _finished = true; + _finished.store(true, std::memory_order_relaxed); _shutdownCond.notify_all(); } diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index cf1f68a8b39..dfdcf4e1970 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -50,7 +50,7 @@ private: std::recursive_mutex _pseudo_thread; // used after transport thread has shut down std::atomic<bool> _started; // event loop started ? std::atomic<bool> _shutdown; // should stop event loop ? - bool _finished; // event loop stopped ? + std::atomic<bool> _finished; // event loop stopped ? /** * Add an IOComponent to the list of components. This operation is @@ -173,6 +173,10 @@ private: return _shutdown.load(std::memory_order_relaxed); } + [[nodiscard]] bool is_finished() const noexcept { + return _finished.load(std::memory_order_relaxed); + } + public: FNET_TransportThread(const FNET_TransportThread &) = delete; FNET_TransportThread &operator=(const FNET_TransportThread &) = delete; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index ed2ce3d638e..c33f918a39c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -423,6 +423,8 @@ RPCNetwork::sync() void RPCNetwork::shutdown() { + // Unschedule any pending target pool flush task that may race with shutdown target flushing + _scheduler.Kill(_targetPoolTask.get()); _transport->ShutDown(true); _threadPool->Close(); _executor->shutdown().sync(); diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index 44e6890415a..b403c65f863 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -54,14 +54,21 @@ void RPCTargetPool::flushTargets(bool force) { uint64_t currentTime = _timer->getMilliTime(); + // Erase RPC targets outside our lock to prevent the following mutex order inversion potential: + // flushTargets (pool lock) -> FNET transport thread post event (transport thread lock) + // FNET CheckTasks (transport thread lock) -> periodic flushTargets task run -> flushTargets (pool lock) + std::vector<Entry> to_erase_on_scope_exit; LockGuard guard(_lock); - TargetMap::iterator it = _targets.begin(); - while (it != _targets.end()) { - const Entry &entry = it->second; - if ( ! entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) { - _targets.erase(it++); // postfix increment to move the iterator - } else { - ++it; + { + auto it = _targets.begin(); + while (it != _targets.end()) { + const Entry& entry = it->second; + if (!entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) { + to_erase_on_scope_exit.emplace_back(std::move(it->second)); + it = _targets.erase(it); + } else { + ++it; + } } } } diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp index eb959dc17b4..80e5925f1a6 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.cpp +++ b/messagebus/src/vespa/messagebus/routing/resender.cpp @@ -10,9 +10,10 @@ using namespace std::chrono; namespace mbus { -Resender::Resender(IRetryPolicy::SP retryPolicy) : - _queue(), - _retryPolicy(retryPolicy) +Resender::Resender(IRetryPolicy::SP retryPolicy) + : _queue_mutex(), + _queue(), + _retryPolicy(retryPolicy) { } Resender::~Resender() @@ -26,13 +27,16 @@ Resender::~Resender() void Resender::resendScheduled() { - typedef std::vector<RoutingNode*> NodeList; + using NodeList = std::vector<RoutingNode*>; NodeList sendList; time_point now = steady_clock::now(); - while (!_queue.empty() && _queue.top().first <= now) { - sendList.push_back(_queue.top().second); - _queue.pop(); + { + std::lock_guard guard(_queue_mutex); + while (!_queue.empty() && _queue.top().first <= now) { + sendList.push_back(_queue.top().second); + _queue.pop(); + } } for (RoutingNode *node : sendList) { @@ -84,6 +88,7 @@ Resender::scheduleRetry(RoutingNode &node) TraceLevel::COMPONENT, vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay)); msg.setRetry(retry); + std::lock_guard guard(_queue_mutex); _queue.push(Entry(steady_clock::now() + delayMS, &node)); return true; } diff --git a/messagebus/src/vespa/messagebus/routing/resender.h b/messagebus/src/vespa/messagebus/routing/resender.h index 599ac789cab..fbce5c7fe8e 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.h +++ b/messagebus/src/vespa/messagebus/routing/resender.h @@ -4,6 +4,7 @@ #include "iretrypolicy.h" #include <vespa/messagebus/queue.h> #include <vespa/messagebus/reply.h> +#include <mutex> #include <queue> #include <vector> @@ -30,6 +31,7 @@ private: }; using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>; + std::mutex _queue_mutex; PriorityQueue _queue; IRetryPolicy::SP _retryPolicy; public: @@ -45,7 +47,7 @@ public: * * @param retryPolicy The retry policy to use. */ - Resender(IRetryPolicy::SP retryPolicy); + explicit Resender(IRetryPolicy::SP retryPolicy); /** * Empties the retry queue. @@ -59,7 +61,7 @@ public: * @param errorCode The code to check. * @return True if the message can be resent. */ - bool canRetry(uint32_t errorCode) const; + [[nodiscard]] bool canRetry(uint32_t errorCode) const; /** * Returns whether or not the given reply should be retried. @@ -67,7 +69,7 @@ public: * @param reply The reply to check. * @return True if retry is required. */ - bool shouldRetry(const Reply &reply) const; + [[nodiscard]] bool shouldRetry(const Reply &reply) const; /** * Schedules the given node for resending, if enabled by message. This will @@ -78,7 +80,7 @@ public: * @param node The node to resend. * @return True if the node was queued. */ - bool scheduleRetry(RoutingNode &node); + [[nodiscard]] bool scheduleRetry(RoutingNode &node); /** * Invokes {@link RoutingNode#send()} on all routing nodes that are diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index 3bff3649634..7e4980277e7 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -118,7 +118,7 @@ public: search::SerialNum _currentSerial; uint32_t _pendingDone; uint32_t _taskDone; - std::mutex _lock; + mutable std::mutex _lock; vespalib::CountDownLatch _done; FlushDoneHistory _flushDoneHistory; @@ -146,7 +146,7 @@ public: std::vector<IFlushTarget::SP> getFlushTargets() override { { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); _pendingDone += _taskDone; _taskDone = 0; } @@ -160,7 +160,7 @@ public: // Called once by flush engine thread for each task done void taskDone() { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); ++_taskDone; } @@ -168,7 +168,7 @@ public: // added to flush engine and when one or more flush tasks related // to flush handler have completed. void flushDone(search::SerialNum oldestSerial) override { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", getName().c_str(), oldestSerial); _oldestSerial = std::max(_oldestSerial, oldestSerial); _flushDoneHistory.push_back(oldestSerial); @@ -179,9 +179,14 @@ public: } FlushDoneHistory getFlushDoneHistory() { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); return _flushDoneHistory; } + + [[nodiscard]] search::SerialNum oldest_serial() const noexcept { + std::lock_guard guard(_lock); + return _oldestSerial; + } }; void WrappedFlushTask::run() @@ -440,11 +445,11 @@ struct Fixture using namespace std::chrono_literals; for (int pass = 0; pass < 600; ++pass) { std::this_thread::sleep_for(100ms); - if (handler._oldestSerial == expOldestSerial) { + if (handler.oldest_serial() == expOldestSerial) { break; } } - EXPECT_EQUAL(expOldestSerial, handler._oldestSerial); + EXPECT_EQUAL(expOldestSerial, handler.oldest_serial()); } }; diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp index 5fe8f25ae72..577e9d128b9 100644 --- a/storageframework/src/tests/thread/tickingthreadtest.cpp +++ b/storageframework/src/tests/thread/tickingthreadtest.cpp @@ -5,24 +5,31 @@ #include <vespa/storageframework/generic/thread/tickingthread.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/atomic.h> #include <thread> +using namespace vespalib::atomic; + namespace storage::framework::defaultimplementation { namespace { struct Context { - uint64_t _critTickCount; - uint64_t _nonCritTickCount; + std::atomic<uint64_t> _critTickCount; + std::atomic<uint64_t> _nonCritTickCount; - Context() : _critTickCount(0), _nonCritTickCount(0) {} + constexpr Context() noexcept : _critTickCount(0), _nonCritTickCount(0) {} + Context(const Context& rhs) noexcept + : _critTickCount(load_relaxed(rhs._critTickCount)), + _nonCritTickCount(load_relaxed(rhs._nonCritTickCount)) + {} }; struct MyApp : public TickingThread { - uint32_t _critOverlapCounter; - bool _doCritOverlapTest; - bool _critOverlap; - std::vector<Context> _context; + std::atomic<uint32_t> _critOverlapCounter; + std::atomic<bool> _critOverlap; + bool _doCritOverlapTest; + std::vector<Context> _context; TickingThreadPool::UP _threadPool; MyApp(int threadCount, bool doCritOverlapTest = false); @@ -34,62 +41,63 @@ struct MyApp : public TickingThread { assert(index < _context.size()); Context& c(_context[index]); if (_doCritOverlapTest) { - uint32_t oldTick = _critOverlapCounter; + uint32_t oldTick = load_relaxed(_critOverlapCounter); std::this_thread::sleep_for(1ms); - _critOverlap |= (_critOverlapCounter != oldTick); - ++_critOverlapCounter; + store_relaxed(_critOverlap, load_relaxed(_critOverlap) || (load_relaxed(_critOverlapCounter) != oldTick)); + _critOverlapCounter.fetch_add(1, std::memory_order_relaxed); } - ++c._critTickCount; + c._critTickCount.fetch_add(1, std::memory_order_relaxed); return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; } ThreadWaitInfo doNonCriticalTick(ThreadIndex index) override { assert(index < _context.size()); Context& c(_context[index]); - ++c._nonCritTickCount; + c._nonCritTickCount.fetch_add(1, std::memory_order_relaxed); return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; } uint64_t getMinCritTick() { uint64_t min = std::numeric_limits<uint64_t>().max(); for (uint32_t i=0; i<_context.size(); ++i) { - min = std::min(min, _context[i]._critTickCount); + min = std::min(min, load_relaxed(_context[i]._critTickCount)); } return min; } uint64_t getMinNonCritTick() { uint64_t min = std::numeric_limits<uint64_t>().max(); for (uint32_t i=0; i<_context.size(); ++i) { - min = std::min(min, _context[i]._critTickCount); + min = std::min(min, load_relaxed(_context[i]._critTickCount)); } return min; } - uint64_t getTotalCritTicks() { + uint64_t getTotalCritTicks() const noexcept { uint64_t total = 0; for (uint32_t i=0; i<_context.size(); ++i) { - total += _context[i]._critTickCount; + total += load_relaxed(_context[i]._critTickCount); } return total; } - uint64_t getTotalNonCritTicks() { + uint64_t getTotalNonCritTicks() const noexcept { uint64_t total = 0; for (uint32_t i=0; i<_context.size(); ++i) { - total += _context[i]._nonCritTickCount; + total += load_relaxed(_context[i]._nonCritTickCount); } return total; } - uint64_t getTotalTicks() - { return getTotalCritTicks() + getTotalNonCritTicks(); } - bool hasCritOverlap() { return _critOverlap; } + uint64_t getTotalTicks() const noexcept { + return getTotalCritTicks() + getTotalNonCritTicks(); + } + bool hasCritOverlap() const noexcept { return load_relaxed(_critOverlap); } }; MyApp::MyApp(int threadCount, bool doCritOverlapTest) : _critOverlapCounter(0), - _doCritOverlapTest(doCritOverlapTest), _critOverlap(false), + _doCritOverlapTest(doCritOverlapTest), _threadPool(TickingThreadPool::createDefault("testApp", 100ms)) { for (int i=0; i<threadCount; ++i) { _threadPool->addThread(*this); - _context.push_back(Context()); + _context.emplace_back(); } } @@ -182,7 +190,7 @@ TEST(TickingThreadTest, test_lock_critical_ticks) app.start(testReg.getThreadPoolImpl()); while (!app.hasCritOverlap()) { std::this_thread::sleep_for(1ms); - ++app._critOverlapCounter; + app._critOverlapCounter.fetch_add(1, std::memory_order_relaxed); ++iterationsBeforeOverlap; } } diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp index 7281c55ba25..b8ef8e4610b 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -3,10 +3,13 @@ #include "threadimpl.h" #include "threadpoolimpl.h" #include <vespa/storageframework/generic/clock/clock.h> +#include <vespa/vespalib/util/atomic.h> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".framework.thread.impl"); +using namespace vespalib::atomic; + namespace storage::framework::defaultimplementation { ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, @@ -27,7 +30,7 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, _thread(*this), _cpu_category(cpu_category) { - _tickData[_tickDataPtr]._lastTick = pool.getClock().getMonotonicTime(); + _tickData[load_relaxed(_tickDataPtr)]._lastTick = pool.getClock().getMonotonicTime(); _thread.start(_pool.getThreadPool()); } @@ -105,15 +108,15 @@ ThreadImpl::registerTick(CycleType cycleType, vespalib::steady_time now) ThreadTickData ThreadImpl::getTickData() const { - return _tickData[_tickDataPtr].loadRelaxed(); + return _tickData[load_acquire(_tickDataPtr)].loadRelaxed(); } void ThreadImpl::setTickData(const ThreadTickData& tickData) { - uint32_t nextData = (_tickDataPtr + 1) % _tickData.size(); + uint32_t nextData = (load_relaxed(_tickDataPtr) + 1) % _tickData.size(); _tickData[nextData].storeRelaxed(tickData); - _tickDataPtr = nextData; + store_release(_tickDataPtr, nextData); } ThreadTickData diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h index e6f0a21ea20..46a9412bf67 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -49,7 +49,7 @@ class ThreadImpl : public Thread Runnable& _runnable; ThreadProperties _properties; std::array<AtomicThreadTickData, 3> _tickData; - uint32_t _tickDataPtr; + std::atomic<uint32_t> _tickDataPtr; std::atomic<bool> _interrupted; bool _joined; BackendThread _thread; diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp index c3230bf313d..ffa9f385967 100644 --- a/vespalib/src/vespa/vespalib/util/thread.cpp +++ b/vespalib/src/vespa/vespalib/util/thread.cpp @@ -60,7 +60,7 @@ Thread & Thread::stop() { std::unique_lock guard(_lock); - _stopped = true; + _stopped.store(true, std::memory_order_relaxed); _cond.notify_all(); return *this; } @@ -75,14 +75,14 @@ bool Thread::slumber(double s) { std::unique_lock guard(_lock); - if (!_stopped || _woken) { + if (!stopped() || _woken) { if (_cond.wait_for(guard, from_s(s)) == std::cv_status::no_timeout) { - _woken = _stopped; + _woken = stopped(); } } else { _woken = true; } - return !_stopped; + return !stopped(); } Thread & diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index e08f3ca1100..0c7693556c7 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -6,6 +6,7 @@ #include "runnable.h" #include "active.h" #include <vespa/fastos/thread.h> +#include <atomic> namespace vespalib { @@ -37,7 +38,7 @@ private: FastOS_ThreadPool _pool; std::mutex _lock; std::condition_variable _cond; - bool _stopped; + std::atomic<bool> _stopped; bool _woken; public: @@ -46,7 +47,9 @@ public: void start() override; Thread &stop() override; void join() override; - bool stopped() const { return _stopped; } + [[nodiscard]] bool stopped() const noexcept { + return _stopped.load(std::memory_order_relaxed); + } bool slumber(double s); static Thread ¤tThread(); static void sleep(size_t ms); |