diff options
Diffstat (limited to 'storage/src')
7 files changed, 30 insertions, 24 deletions
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 867ac9d38fc..e567206c371 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -29,6 +29,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/base/testdocman.h> +#include <atomic> namespace storage { @@ -49,7 +50,7 @@ protected: document::TestDocMan _docMan; TestNodeStateUpdater _nodeStateUpdater; vespalib::string _configId; - bool _initialized; + std::atomic<bool> _initialized; public: /** diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 61c55248370..f03d2fa3647 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -20,6 +20,7 @@ #include <vespa/persistence/spi/test.h> #include <vespa/config/common/exceptions.h> #include <vespa/fastos/file.h> +#include <atomic> #include <vespa/log/log.h> LOG_SETUP(".filestormanagertest"); @@ -668,8 +669,8 @@ class MessagePusherThread : public document::Runnable public: FileStorHandler& _handler; Document::SP _doc; - bool _done; - bool _threadDone; + std::atomic<bool> _done; + std::atomic<bool> _threadDone; MessagePusherThread(FileStorHandler& handler, Document::SP doc); ~MessagePusherThread(); @@ -698,10 +699,10 @@ public: const uint32_t _threadId; FileStorHandler& _handler; std::atomic<uint32_t> _config; - uint32_t _fetchedCount; - bool _done; - bool _failed; - bool _threadDone; + std::atomic<uint32_t> _fetchedCount; + std::atomic<bool> _done; + std::atomic<bool> _failed; + std::atomic<bool> _threadDone; MessageFetchingThread(FileStorHandler& handler) : _threadId(handler.getNextStripeId(0)), _handler(handler), _config(0), _fetchedCount(0), _done(false), @@ -766,7 +767,7 @@ FileStorManagerTest::testHandlerPausedMultiThread() ResumeGuard guard = filestorHandler.pause(); thread._config.fetch_add(1); uint32_t count = thread._fetchedCount; - CPPUNIT_ASSERT_EQUAL(count, thread._fetchedCount); + CPPUNIT_ASSERT_EQUAL(count, thread._fetchedCount.load()); } pushthread._done = true; diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h index dc3bbef373c..a9d83ca6c1a 100644 --- a/storage/src/tests/storageserver/testvisitormessagesession.h +++ b/storage/src/tests/storageserver/testvisitormessagesession.h @@ -7,6 +7,8 @@ #include <vespa/documentapi/messagebus/messages/documentmessage.h> #include <vespa/storage/storageserver/priorityconverter.h> #include <vespa/config/subscription/configuri.h> + +#include <atomic> #include <deque> namespace storage { @@ -23,7 +25,7 @@ public: VisitorThread& thread; Visitor& visitor; - uint32_t pendingCount; + std::atomic<uint32_t> pendingCount; ~TestVisitorMessageSession(); diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index b53c8c270f2..97b357a3bf7 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -18,9 +18,9 @@ StorageLink::~StorageLink() = default; void StorageLink::push_back(StorageLink::UP link) { - if (_state != CREATED) { + if (getState() != CREATED) { LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s", - link->toString().c_str(), toString().c_str(), stateToString(_state)); + link->toString().c_str(), toString().c_str(), stateToString(getState())); assert(false); } assert(link); @@ -39,9 +39,9 @@ void StorageLink::open() // up, the link receiving them should have their state as opened. StorageLink* link = this; while (true) { - if (link->_state != CREATED) { + if (link->getState() != CREATED) { LOG(error, "During open(), link %s should be in CREATED state, not in state %s.", - toString().c_str(), stateToString(link->_state)); + toString().c_str(), stateToString(link->getState())); assert(false); } link->_state = OPENED; @@ -83,9 +83,9 @@ void StorageLink::closeNextLink() { void StorageLink::flush() { - if (_state != CLOSING) { + if (getState() != CLOSING) { LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.", - toString().c_str(), stateToString(_state)); + toString().c_str(), stateToString(getState())); assert(false); } // First flush down to get all requests out of the system. @@ -114,14 +114,14 @@ void StorageLink::flush() void StorageLink::sendDown(const StorageMessage::SP& msg) { // Verify acceptable state to send messages down - switch(_state) { + switch(getState()) { case OPENED: case CLOSING: case FLUSHINGDOWN: break; default: LOG(error, "Link %s trying to send %s down while in state %s", - toString().c_str(), msg->toString().c_str(), stateToString(_state)); + toString().c_str(), msg->toString().c_str(), stateToString(getState())); assert(false); } assert(msg.get()); @@ -155,7 +155,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) { // Verify acceptable state to send messages up - switch(_state) { + switch(getState()) { case OPENED: case CLOSING: case FLUSHINGDOWN: @@ -163,7 +163,7 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) break; default: LOG(error, "Link %s trying to send %s up while in state %s", - toString().c_str(), msg->toString(true).c_str(), stateToString(_state)); + toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); assert(false); } assert(msg.get()); diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index 11705385646..8307c19dce6 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -23,6 +23,7 @@ #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/document/util/printable.h> +#include <atomic> namespace storage { @@ -41,7 +42,7 @@ private: std::string _name; StorageLink* _up; std::unique_ptr<StorageLink> _down; - State _state; + std::atomic<State> _state; public: StorageLink(const StorageLink &) = delete; @@ -59,7 +60,7 @@ public: void push_back(StorageLink::UP); /** Get the current state of the storage link. */ - State getState() const { return _state; } + State getState() const noexcept { return _state.load(std::memory_order_relaxed); } /** * Called by storage server after the storage chain have been created. diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index d5b56c3ee53..1d33e829d49 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -202,7 +202,7 @@ VisitorThread::run(framework::ThreadHandle& thread) tick(); vespalib::MonitorGuard guard(_queueMonitor); if (_queue.empty()) { - guard.wait(_timeBetweenTicks); + guard.wait(_timeBetweenTicks.load(std::memory_order_relaxed)); thread.registerTick(framework::WAIT_CYCLE); } continue; diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index a9dcd8f5806..2b15468fd3f 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -23,6 +23,7 @@ #include <vespa/metrics/metrictimer.h> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/sync.h> +#include <atomic> #include <deque> namespace storage { @@ -85,7 +86,7 @@ class VisitorThread : public framework::Runnable, uint32_t _visitorMemoryUsageLimit; framework::MilliSecTime _defaultDocBlockTimeout; framework::MilliSecTime _defaultVisitorInfoTimeout; - uint32_t _timeBetweenTicks; + std::atomic<uint32_t> _timeBetweenTicks; StorageComponent _component; framework::Thread::UP _thread; VisitorMessageSessionFactory& _messageSessionFactory; @@ -102,7 +103,7 @@ public: void processMessage(api::VisitorId visitorId, const std::shared_ptr<api::StorageMessage>& msg); void shutdown(); - void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks = time; } + void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); } void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor); /** For unit tests needing to pause thread. */ |