diff options
19 files changed, 67 insertions, 52 deletions
diff --git a/config/src/vespa/config/subscription/configsubscriptionset.cpp b/config/src/vespa/config/subscription/configsubscriptionset.cpp index f120272edcf..9b3ef32a128 100644 --- a/config/src/vespa/config/subscription/configsubscriptionset.cpp +++ b/config/src/vespa/config/subscription/configsubscriptionset.cpp @@ -42,7 +42,7 @@ ConfigSubscriptionSet::acquireSnapshot(uint64_t timeoutInMillis, bool ignoreChan bool inSync = false; LOG(debug, "Going into nextConfig loop, time left is %d", timeLeft); - while ((_state != CLOSED) && (timeLeft >= 0) && !inSync) { + while (!isClosed() && (timeLeft >= 0) && !inSync) { size_t numChanged = 0; size_t numGenerationChanged = 0; bool generationsInSync = true; @@ -117,7 +117,7 @@ ConfigSubscriptionSet::close() bool ConfigSubscriptionSet::isClosed() const { - return (_state == CLOSED); + return (_state.load(std::memory_order_relaxed) == CLOSED); } ConfigSubscription::SP diff --git a/config/src/vespa/config/subscription/configsubscriptionset.h b/config/src/vespa/config/subscription/configsubscriptionset.h index cebd3007b19..ce1f45b0eb8 100644 --- a/config/src/vespa/config/subscription/configsubscriptionset.h +++ b/config/src/vespa/config/subscription/configsubscriptionset.h @@ -9,6 +9,8 @@ #include "configsubscription.h" #include "configprovider.h" +#include <atomic> + namespace config { /** @@ -60,7 +62,7 @@ private: int64_t _currentGeneration; // Holds the current config generation. SubscriptionList _subscriptionList; // List of current subscriptions. - SubscriberState _state; // Current state of this subscriber. + std::atomic<SubscriberState> _state; // Current state of this subscriber. }; } // namespace config diff --git a/fastos/src/vespa/fastos/thread.cpp b/fastos/src/vespa/fastos/thread.cpp index 5e3400b70e3..3df8fa584a7 100644 --- a/fastos/src/vespa/fastos/thread.cpp +++ b/fastos/src/vespa/fastos/thread.cpp @@ -266,7 +266,7 @@ void FastOS_ThreadInterface::Hook () } _owner = nullptr; _startArg = nullptr; - _breakFlag = false; + _breakFlag.store(false, std::memory_order_relaxed); finished = _pool->isClosed(); dispatchedGuard.unlock(); // END lock @@ -322,7 +322,7 @@ void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) void FastOS_ThreadInterface::SetBreakFlag() { std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex); - _breakFlag = true; + _breakFlag.store(true, std::memory_order_relaxed); _dispatchedCond.notify_one(); } diff --git a/fastos/src/vespa/fastos/thread.h b/fastos/src/vespa/fastos/thread.h index 2726efe3cf0..c025a48d563 100644 --- a/fastos/src/vespa/fastos/thread.h +++ b/fastos/src/vespa/fastos/thread.h @@ -12,6 +12,7 @@ #include "types.h" +#include <atomic> #include <mutex> #include <condition_variable> @@ -294,7 +295,7 @@ protected: /** * Break flag. If true, the thread should exit. */ - bool _breakFlag; + std::atomic<bool> _breakFlag; /** * Is this thread active or free in the threadpool? @@ -385,7 +386,7 @@ public: */ bool GetBreakFlag () const { - return _breakFlag; + return _breakFlag.load(std::memory_order_relaxed); } /** diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index b38e2f0aa08..8ee33ad7960 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -116,7 +116,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, bool wasEmpty; { std::unique_lock<std::mutex> guard(_lock); - if (_shutdown) { + if (IsShutDown()) { guard.unlock(); SafeDiscardEvent(cpacket, context); return false; @@ -381,8 +381,8 @@ FNET_TransportThread::ShutDown(bool waitFinished) bool wasEmpty = false; { std::lock_guard<std::mutex> guard(_lock); - if (!_shutdown) { - _shutdown = true; + if (!IsShutDown()) { + _shutdown.store(true, std::memory_order_relaxed); wasEmpty = _queue.IsEmpty_NoLock(); } } @@ -519,7 +519,7 @@ FNET_TransportThread::EventLoopIteration() FastOS_Time beforeGetEvents; #endif - if (!_shutdown) { + if (!IsShutDown()) { #ifdef FNET_SANITY_CHECKS // Warn if event loop takes more than 250ms @@ -569,7 +569,7 @@ FNET_TransportThread::EventLoopIteration() FlushDeleteList(); } // -- END OF MAIN EVENT LOOP -- - if (!_shutdown) + if (!IsShutDown()) return true; if (_finished) return false; diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 86c7cf1dfcd..ffbbb7acc0f 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -10,6 +10,7 @@ #include <vespa/fastos/time.h> #include <vespa/vespalib/net/socket_handle.h> #include <vespa/vespalib/net/selector.h> +#include <atomic> #include <mutex> #include <condition_variable> @@ -48,7 +49,7 @@ private: std::condition_variable _cond; // used for synchronization std::recursive_mutex _pseudo_thread; // used after transport thread has shut down bool _started; // event loop started ? - bool _shutdown; // should stop event loop ? + std::atomic<bool> _shutdown; // should stop event loop ? bool _finished; // event loop stopped ? bool _waitFinished; // someone is waiting for _finished bool _deleted; // destructor called ? @@ -171,6 +172,10 @@ private: **/ bool EventLoopIteration(); + bool IsShutDown() const noexcept { + return _shutdown.load(std::memory_order_relaxed); + } + public: /** * Construct a transport object. To activate your newly created diff --git a/metrics/src/vespa/metrics/metricmanager.cpp b/metrics/src/vespa/metrics/metricmanager.cpp index 7125446c168..4621c2229b6 100644 --- a/metrics/src/vespa/metrics/metricmanager.cpp +++ b/metrics/src/vespa/metrics/metricmanager.cpp @@ -176,7 +176,7 @@ MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, // Wait for first iteration to have completed, such that it is safe // to access snapshots afterwards. vespalib::MonitorGuard sync(_waiter); - while (_lastProcessedTime == 0) { + while (_lastProcessedTime.load(std::memory_order_relaxed) == 0) { sync.wait(1); } } else { @@ -863,10 +863,10 @@ MetricManager::tick(const MetricLockGuard & guard, time_t currentTime) } _forceEventLogging = false; } - _lastProcessedTime = (nextWorkTime <= currentTime ? nextWorkTime - : currentTime); + _lastProcessedTime.store(nextWorkTime <= currentTime ? nextWorkTime : currentTime, + std::memory_order_relaxed); LOG(spam, "Worker thread done with processing for time %" PRIu64 ".", - _lastProcessedTime); + _lastProcessedTime.load(std::memory_order_relaxed)); time_t next = std::min( _logPeriod.second, _snapshots[0]->getPeriod() + _snapshots[0]->getToTime()); diff --git a/metrics/src/vespa/metrics/metricmanager.h b/metrics/src/vespa/metrics/metricmanager.h index ec166529b94..4089236ca48 100644 --- a/metrics/src/vespa/metrics/metricmanager.h +++ b/metrics/src/vespa/metrics/metricmanager.h @@ -111,7 +111,7 @@ private: std::vector<MetricSnapshotSet::SP> _snapshots; MetricSnapshot::SP _totalMetrics; std::unique_ptr<Timer> _timer; - time_t _lastProcessedTime; + std::atomic<time_t> _lastProcessedTime; bool _forceEventLogging; // Should be added to config, but wont now due to problems with // upgrading @@ -262,7 +262,7 @@ public: void checkMetricsAltered(const MetricLockGuard &); /** Used by unit tests to verify that we have processed for a given time. */ - time_t getLastProcessedTime() const { return _lastProcessedTime; } + time_t getLastProcessedTime() const { return _lastProcessedTime.load(std::memory_order_relaxed); } /** Used by unit tests to wake waiters after altering time. */ void timeChangedNotification() const; diff --git a/slobrok/src/vespa/slobrok/sbregister.cpp b/slobrok/src/vespa/slobrok/sbregister.cpp index a1346feeece..72be5f69538 100644 --- a/slobrok/src/vespa/slobrok/sbregister.cpp +++ b/slobrok/src/vespa/slobrok/sbregister.cpp @@ -102,7 +102,7 @@ RegisterAPI::registerName(vespalib::stringref name) return; } } - _busy = true; + _busy.store(true, std::memory_order_relaxed); _names.push_back(name); _pending.push_back(name); discard(_unreg, name); @@ -114,7 +114,7 @@ void RegisterAPI::unregisterName(vespalib::stringref name) { std::lock_guard<std::mutex> guard(_lock); - _busy = true; + _busy.store(true, std::memory_order_relaxed); discard(_names, name); discard(_pending, name); _unreg.push_back(name); @@ -137,7 +137,7 @@ RegisterAPI::handleReqDone() _target->SubRef(); } _target = 0; - _busy = true; + _busy.store(true, std::memory_order_relaxed); } else { LOG(warning, "%s(%s -> %s) failed: %s", _req->GetMethodName(), @@ -241,7 +241,7 @@ RegisterAPI::handlePending() std::lock_guard<std::mutex> guard(_lock); _pending = _names; LOG(debug, "done, reschedule in 30s"); - _busy = false; + _busy.store(false, std::memory_order_relaxed); Schedule(30.0); } } diff --git a/slobrok/src/vespa/slobrok/sbregister.h b/slobrok/src/vespa/slobrok/sbregister.h index 234e6c076d4..8e8614fdf0a 100644 --- a/slobrok/src/vespa/slobrok/sbregister.h +++ b/slobrok/src/vespa/slobrok/sbregister.h @@ -6,6 +6,7 @@ #include "cfg.h" #include <vespa/fnet/frt/invoker.h> #include <vespa/fnet/frt/invokable.h> +#include <atomic> class FRT_Target; @@ -53,7 +54,7 @@ public: * * @return true if there are outstanding registration requests **/ - bool busy() const { return _busy; } + bool busy() const { return _busy.load(std::memory_order_relaxed); } private: class RPCHooks: public FRT_Invokable @@ -86,7 +87,7 @@ private: RPCHooks _hooks; std::mutex _lock; bool _reqDone; - bool _busy; + std::atomic<bool> _busy; SlobrokList _slobrokSpecs; Configurator::UP _configurator; vespalib::string _currSlobrok; diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 867ac9d38fc..e567206c371 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -29,6 +29,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/base/testdocman.h> +#include <atomic> namespace storage { @@ -49,7 +50,7 @@ protected: document::TestDocMan _docMan; TestNodeStateUpdater _nodeStateUpdater; vespalib::string _configId; - bool _initialized; + std::atomic<bool> _initialized; public: /** diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 61c55248370..f03d2fa3647 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -20,6 +20,7 @@ #include <vespa/persistence/spi/test.h> #include <vespa/config/common/exceptions.h> #include <vespa/fastos/file.h> +#include <atomic> #include <vespa/log/log.h> LOG_SETUP(".filestormanagertest"); @@ -668,8 +669,8 @@ class MessagePusherThread : public document::Runnable public: FileStorHandler& _handler; Document::SP _doc; - bool _done; - bool _threadDone; + std::atomic<bool> _done; + std::atomic<bool> _threadDone; MessagePusherThread(FileStorHandler& handler, Document::SP doc); ~MessagePusherThread(); @@ -698,10 +699,10 @@ public: const uint32_t _threadId; FileStorHandler& _handler; std::atomic<uint32_t> _config; - uint32_t _fetchedCount; - bool _done; - bool _failed; - bool _threadDone; + std::atomic<uint32_t> _fetchedCount; + std::atomic<bool> _done; + std::atomic<bool> _failed; + std::atomic<bool> _threadDone; MessageFetchingThread(FileStorHandler& handler) : _threadId(handler.getNextStripeId(0)), _handler(handler), _config(0), _fetchedCount(0), _done(false), @@ -766,7 +767,7 @@ FileStorManagerTest::testHandlerPausedMultiThread() ResumeGuard guard = filestorHandler.pause(); thread._config.fetch_add(1); uint32_t count = thread._fetchedCount; - CPPUNIT_ASSERT_EQUAL(count, thread._fetchedCount); + CPPUNIT_ASSERT_EQUAL(count, thread._fetchedCount.load()); } pushthread._done = true; diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h index dc3bbef373c..a9d83ca6c1a 100644 --- a/storage/src/tests/storageserver/testvisitormessagesession.h +++ b/storage/src/tests/storageserver/testvisitormessagesession.h @@ -7,6 +7,8 @@ #include <vespa/documentapi/messagebus/messages/documentmessage.h> #include <vespa/storage/storageserver/priorityconverter.h> #include <vespa/config/subscription/configuri.h> + +#include <atomic> #include <deque> namespace storage { @@ -23,7 +25,7 @@ public: VisitorThread& thread; Visitor& visitor; - uint32_t pendingCount; + std::atomic<uint32_t> pendingCount; ~TestVisitorMessageSession(); diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index b53c8c270f2..97b357a3bf7 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -18,9 +18,9 @@ StorageLink::~StorageLink() = default; void StorageLink::push_back(StorageLink::UP link) { - if (_state != CREATED) { + if (getState() != CREATED) { LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s", - link->toString().c_str(), toString().c_str(), stateToString(_state)); + link->toString().c_str(), toString().c_str(), stateToString(getState())); assert(false); } assert(link); @@ -39,9 +39,9 @@ void StorageLink::open() // up, the link receiving them should have their state as opened. StorageLink* link = this; while (true) { - if (link->_state != CREATED) { + if (link->getState() != CREATED) { LOG(error, "During open(), link %s should be in CREATED state, not in state %s.", - toString().c_str(), stateToString(link->_state)); + toString().c_str(), stateToString(link->getState())); assert(false); } link->_state = OPENED; @@ -83,9 +83,9 @@ void StorageLink::closeNextLink() { void StorageLink::flush() { - if (_state != CLOSING) { + if (getState() != CLOSING) { LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.", - toString().c_str(), stateToString(_state)); + toString().c_str(), stateToString(getState())); assert(false); } // First flush down to get all requests out of the system. @@ -114,14 +114,14 @@ void StorageLink::flush() void StorageLink::sendDown(const StorageMessage::SP& msg) { // Verify acceptable state to send messages down - switch(_state) { + switch(getState()) { case OPENED: case CLOSING: case FLUSHINGDOWN: break; default: LOG(error, "Link %s trying to send %s down while in state %s", - toString().c_str(), msg->toString().c_str(), stateToString(_state)); + toString().c_str(), msg->toString().c_str(), stateToString(getState())); assert(false); } assert(msg.get()); @@ -155,7 +155,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) { // Verify acceptable state to send messages up - switch(_state) { + switch(getState()) { case OPENED: case CLOSING: case FLUSHINGDOWN: @@ -163,7 +163,7 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) break; default: LOG(error, "Link %s trying to send %s up while in state %s", - toString().c_str(), msg->toString(true).c_str(), stateToString(_state)); + toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); assert(false); } assert(msg.get()); diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index 11705385646..8307c19dce6 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -23,6 +23,7 @@ #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/document/util/printable.h> +#include <atomic> namespace storage { @@ -41,7 +42,7 @@ private: std::string _name; StorageLink* _up; std::unique_ptr<StorageLink> _down; - State _state; + std::atomic<State> _state; public: StorageLink(const StorageLink &) = delete; @@ -59,7 +60,7 @@ public: void push_back(StorageLink::UP); /** Get the current state of the storage link. */ - State getState() const { return _state; } + State getState() const noexcept { return _state.load(std::memory_order_relaxed); } /** * Called by storage server after the storage chain have been created. diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index d5b56c3ee53..1d33e829d49 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -202,7 +202,7 @@ VisitorThread::run(framework::ThreadHandle& thread) tick(); vespalib::MonitorGuard guard(_queueMonitor); if (_queue.empty()) { - guard.wait(_timeBetweenTicks); + guard.wait(_timeBetweenTicks.load(std::memory_order_relaxed)); thread.registerTick(framework::WAIT_CYCLE); } continue; diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index a9dcd8f5806..2b15468fd3f 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -23,6 +23,7 @@ #include <vespa/metrics/metrictimer.h> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/sync.h> +#include <atomic> #include <deque> namespace storage { @@ -85,7 +86,7 @@ class VisitorThread : public framework::Runnable, uint32_t _visitorMemoryUsageLimit; framework::MilliSecTime _defaultDocBlockTimeout; framework::MilliSecTime _defaultVisitorInfoTimeout; - uint32_t _timeBetweenTicks; + std::atomic<uint32_t> _timeBetweenTicks; StorageComponent _component; framework::Thread::UP _thread; VisitorMessageSessionFactory& _messageSessionFactory; @@ -102,7 +103,7 @@ public: void processMessage(api::VisitorId visitorId, const std::shared_ptr<api::StorageMessage>& msg); void shutdown(); - void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks = time; } + void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); } void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor); /** For unit tests needing to pause thread. */ diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp index b9ccb2d4d9f..7c30944d911 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -46,7 +46,7 @@ ThreadImpl::run() bool ThreadImpl::interrupted() const { - return _interrupted; + return _interrupted.load(std::memory_order_relaxed); } bool @@ -58,7 +58,7 @@ ThreadImpl::joined() const void ThreadImpl::interrupt() { - _interrupted = true; + _interrupted.store(true, std::memory_order_relaxed); _thread.stop(); } diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h index 798251f0573..bb89c9167a3 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -42,7 +42,7 @@ class ThreadImpl : public Thread ThreadProperties _properties; std::array<AtomicThreadTickData, 3> _tickData; uint32_t _tickDataPtr; - bool _interrupted; + std::atomic<bool> _interrupted; bool _joined; BackendThread _thread; |