diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2019-01-10 15:06:11 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@oath.com> | 2019-01-10 15:06:11 +0000 |
commit | 7d1dda33a7b4e1e1727947c0104ed86036a5bef4 (patch) | |
tree | cbd0655c11aa50069820a14932918ff0b599bc1c /storage | |
parent | aa2701268df26145d8a58acc7603b4006d00e288 (diff) |
Use atomics for flags and configs set and read across threads
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/tests/common/teststorageapp.h | 3 | ||||
-rw-r--r-- | storage/src/vespa/storage/common/storagelink.cpp | 20 | ||||
-rw-r--r-- | storage/src/vespa/storage/common/storagelink.h | 5 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitorthread.cpp | 2 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitorthread.h | 5 |
5 files changed, 19 insertions, 16 deletions
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 867ac9d38fc..e567206c371 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -29,6 +29,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/base/testdocman.h> +#include <atomic> namespace storage { @@ -49,7 +50,7 @@ protected: document::TestDocMan _docMan; TestNodeStateUpdater _nodeStateUpdater; vespalib::string _configId; - bool _initialized; + std::atomic<bool> _initialized; public: /** diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index b53c8c270f2..97b357a3bf7 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -18,9 +18,9 @@ StorageLink::~StorageLink() = default; void StorageLink::push_back(StorageLink::UP link) { - if (_state != CREATED) { + if (getState() != CREATED) { LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s", - link->toString().c_str(), toString().c_str(), stateToString(_state)); + link->toString().c_str(), toString().c_str(), stateToString(getState())); assert(false); } assert(link); @@ -39,9 +39,9 @@ void StorageLink::open() // up, the link receiving them should have their state as opened. StorageLink* link = this; while (true) { - if (link->_state != CREATED) { + if (link->getState() != CREATED) { LOG(error, "During open(), link %s should be in CREATED state, not in state %s.", - toString().c_str(), stateToString(link->_state)); + toString().c_str(), stateToString(link->getState())); assert(false); } link->_state = OPENED; @@ -83,9 +83,9 @@ void StorageLink::closeNextLink() { void StorageLink::flush() { - if (_state != CLOSING) { + if (getState() != CLOSING) { LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.", - toString().c_str(), stateToString(_state)); + toString().c_str(), stateToString(getState())); assert(false); } // First flush down to get all requests out of the system. @@ -114,14 +114,14 @@ void StorageLink::flush() void StorageLink::sendDown(const StorageMessage::SP& msg) { // Verify acceptable state to send messages down - switch(_state) { + switch(getState()) { case OPENED: case CLOSING: case FLUSHINGDOWN: break; default: LOG(error, "Link %s trying to send %s down while in state %s", - toString().c_str(), msg->toString().c_str(), stateToString(_state)); + toString().c_str(), msg->toString().c_str(), stateToString(getState())); assert(false); } assert(msg.get()); @@ -155,7 +155,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) { // Verify acceptable state to send messages up - switch(_state) { + switch(getState()) { case OPENED: case CLOSING: case FLUSHINGDOWN: @@ -163,7 +163,7 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) break; default: LOG(error, "Link %s trying to send %s up while in state %s", - toString().c_str(), msg->toString(true).c_str(), stateToString(_state)); + toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); assert(false); } assert(msg.get()); diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index 11705385646..8307c19dce6 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -23,6 +23,7 @@ #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/document/util/printable.h> +#include <atomic> namespace storage { @@ -41,7 +42,7 @@ private: std::string _name; StorageLink* _up; std::unique_ptr<StorageLink> _down; - State _state; + std::atomic<State> _state; public: StorageLink(const StorageLink &) = delete; @@ -59,7 +60,7 @@ public: void push_back(StorageLink::UP); /** Get the current state of the storage link. */ - State getState() const { return _state; } + State getState() const noexcept { return _state.load(std::memory_order_relaxed); } /** * Called by storage server after the storage chain have been created. diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index d5b56c3ee53..1d33e829d49 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -202,7 +202,7 @@ VisitorThread::run(framework::ThreadHandle& thread) tick(); vespalib::MonitorGuard guard(_queueMonitor); if (_queue.empty()) { - guard.wait(_timeBetweenTicks); + guard.wait(_timeBetweenTicks.load(std::memory_order_relaxed)); thread.registerTick(framework::WAIT_CYCLE); } continue; diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index a9dcd8f5806..2b15468fd3f 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -23,6 +23,7 @@ #include <vespa/metrics/metrictimer.h> #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/sync.h> +#include <atomic> #include <deque> namespace storage { @@ -85,7 +86,7 @@ class VisitorThread : public framework::Runnable, uint32_t _visitorMemoryUsageLimit; framework::MilliSecTime _defaultDocBlockTimeout; framework::MilliSecTime _defaultVisitorInfoTimeout; - uint32_t _timeBetweenTicks; + std::atomic<uint32_t> _timeBetweenTicks; StorageComponent _component; framework::Thread::UP _thread; VisitorMessageSessionFactory& _messageSessionFactory; @@ -102,7 +103,7 @@ public: void processMessage(api::VisitorId visitorId, const std::shared_ptr<api::StorageMessage>& msg); void shutdown(); - void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks = time; } + void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); } void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor); /** For unit tests needing to pause thread. */ |