summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2019-01-15 09:48:44 +0100
committerGitHub <noreply@github.com>2019-01-15 09:48:44 +0100
commit959655d2bd9f743b34c6e5caac21e44de48187e3 (patch)
tree4aba819ffb380484fedf12770fb5b7e8bddc5a04 /storage
parent86b4be8b2439880c7e40c157896e1b42caac4b86 (diff)
parent7d1dda33a7b4e1e1727947c0104ed86036a5bef4 (diff)
Merge pull request #8109 from vespa-engine/vekterli/fix-tsan-detected-data-races
Fix some ThreadSanitizer reported data races
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/teststorageapp.h3
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp15
-rw-r--r--storage/src/tests/storageserver/testvisitormessagesession.h4
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp20
-rw-r--r--storage/src/vespa/storage/common/storagelink.h5
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp2
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h5
7 files changed, 30 insertions, 24 deletions
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index 867ac9d38fc..e567206c371 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -29,6 +29,7 @@
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/base/testdocman.h>
+#include <atomic>
namespace storage {
@@ -49,7 +50,7 @@ protected:
document::TestDocMan _docMan;
TestNodeStateUpdater _nodeStateUpdater;
vespalib::string _configId;
- bool _initialized;
+ std::atomic<bool> _initialized;
public:
/**
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 61c55248370..f03d2fa3647 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -20,6 +20,7 @@
#include <vespa/persistence/spi/test.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/fastos/file.h>
+#include <atomic>
#include <vespa/log/log.h>
LOG_SETUP(".filestormanagertest");
@@ -668,8 +669,8 @@ class MessagePusherThread : public document::Runnable
public:
FileStorHandler& _handler;
Document::SP _doc;
- bool _done;
- bool _threadDone;
+ std::atomic<bool> _done;
+ std::atomic<bool> _threadDone;
MessagePusherThread(FileStorHandler& handler, Document::SP doc);
~MessagePusherThread();
@@ -698,10 +699,10 @@ public:
const uint32_t _threadId;
FileStorHandler& _handler;
std::atomic<uint32_t> _config;
- uint32_t _fetchedCount;
- bool _done;
- bool _failed;
- bool _threadDone;
+ std::atomic<uint32_t> _fetchedCount;
+ std::atomic<bool> _done;
+ std::atomic<bool> _failed;
+ std::atomic<bool> _threadDone;
MessageFetchingThread(FileStorHandler& handler)
: _threadId(handler.getNextStripeId(0)), _handler(handler), _config(0), _fetchedCount(0), _done(false),
@@ -766,7 +767,7 @@ FileStorManagerTest::testHandlerPausedMultiThread()
ResumeGuard guard = filestorHandler.pause();
thread._config.fetch_add(1);
uint32_t count = thread._fetchedCount;
- CPPUNIT_ASSERT_EQUAL(count, thread._fetchedCount);
+ CPPUNIT_ASSERT_EQUAL(count, thread._fetchedCount.load());
}
pushthread._done = true;
diff --git a/storage/src/tests/storageserver/testvisitormessagesession.h b/storage/src/tests/storageserver/testvisitormessagesession.h
index dc3bbef373c..a9d83ca6c1a 100644
--- a/storage/src/tests/storageserver/testvisitormessagesession.h
+++ b/storage/src/tests/storageserver/testvisitormessagesession.h
@@ -7,6 +7,8 @@
#include <vespa/documentapi/messagebus/messages/documentmessage.h>
#include <vespa/storage/storageserver/priorityconverter.h>
#include <vespa/config/subscription/configuri.h>
+
+#include <atomic>
#include <deque>
namespace storage {
@@ -23,7 +25,7 @@ public:
VisitorThread& thread;
Visitor& visitor;
- uint32_t pendingCount;
+ std::atomic<uint32_t> pendingCount;
~TestVisitorMessageSession();
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index b53c8c270f2..97b357a3bf7 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -18,9 +18,9 @@ StorageLink::~StorageLink() = default;
void StorageLink::push_back(StorageLink::UP link)
{
- if (_state != CREATED) {
+ if (getState() != CREATED) {
LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s",
- link->toString().c_str(), toString().c_str(), stateToString(_state));
+ link->toString().c_str(), toString().c_str(), stateToString(getState()));
assert(false);
}
assert(link);
@@ -39,9 +39,9 @@ void StorageLink::open()
// up, the link receiving them should have their state as opened.
StorageLink* link = this;
while (true) {
- if (link->_state != CREATED) {
+ if (link->getState() != CREATED) {
LOG(error, "During open(), link %s should be in CREATED state, not in state %s.",
- toString().c_str(), stateToString(link->_state));
+ toString().c_str(), stateToString(link->getState()));
assert(false);
}
link->_state = OPENED;
@@ -83,9 +83,9 @@ void StorageLink::closeNextLink() {
void StorageLink::flush()
{
- if (_state != CLOSING) {
+ if (getState() != CLOSING) {
LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.",
- toString().c_str(), stateToString(_state));
+ toString().c_str(), stateToString(getState()));
assert(false);
}
// First flush down to get all requests out of the system.
@@ -114,14 +114,14 @@ void StorageLink::flush()
void StorageLink::sendDown(const StorageMessage::SP& msg)
{
// Verify acceptable state to send messages down
- switch(_state) {
+ switch(getState()) {
case OPENED:
case CLOSING:
case FLUSHINGDOWN:
break;
default:
LOG(error, "Link %s trying to send %s down while in state %s",
- toString().c_str(), msg->toString().c_str(), stateToString(_state));
+ toString().c_str(), msg->toString().c_str(), stateToString(getState()));
assert(false);
}
assert(msg.get());
@@ -155,7 +155,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
{
// Verify acceptable state to send messages up
- switch(_state) {
+ switch(getState()) {
case OPENED:
case CLOSING:
case FLUSHINGDOWN:
@@ -163,7 +163,7 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
break;
default:
LOG(error, "Link %s trying to send %s up while in state %s",
- toString().c_str(), msg->toString(true).c_str(), stateToString(_state));
+ toString().c_str(), msg->toString(true).c_str(), stateToString(getState()));
assert(false);
}
assert(msg.get());
diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h
index 11705385646..8307c19dce6 100644
--- a/storage/src/vespa/storage/common/storagelink.h
+++ b/storage/src/vespa/storage/common/storagelink.h
@@ -23,6 +23,7 @@
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/document/util/printable.h>
+#include <atomic>
namespace storage {
@@ -41,7 +42,7 @@ private:
std::string _name;
StorageLink* _up;
std::unique_ptr<StorageLink> _down;
- State _state;
+ std::atomic<State> _state;
public:
StorageLink(const StorageLink &) = delete;
@@ -59,7 +60,7 @@ public:
void push_back(StorageLink::UP);
/** Get the current state of the storage link. */
- State getState() const { return _state; }
+ State getState() const noexcept { return _state.load(std::memory_order_relaxed); }
/**
* Called by storage server after the storage chain have been created.
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index d5b56c3ee53..1d33e829d49 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -202,7 +202,7 @@ VisitorThread::run(framework::ThreadHandle& thread)
tick();
vespalib::MonitorGuard guard(_queueMonitor);
if (_queue.empty()) {
- guard.wait(_timeBetweenTicks);
+ guard.wait(_timeBetweenTicks.load(std::memory_order_relaxed));
thread.registerTick(framework::WAIT_CYCLE);
}
continue;
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index a9dcd8f5806..2b15468fd3f 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -23,6 +23,7 @@
#include <vespa/metrics/metrictimer.h>
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/sync.h>
+#include <atomic>
#include <deque>
namespace storage {
@@ -85,7 +86,7 @@ class VisitorThread : public framework::Runnable,
uint32_t _visitorMemoryUsageLimit;
framework::MilliSecTime _defaultDocBlockTimeout;
framework::MilliSecTime _defaultVisitorInfoTimeout;
- uint32_t _timeBetweenTicks;
+ std::atomic<uint32_t> _timeBetweenTicks;
StorageComponent _component;
framework::Thread::UP _thread;
VisitorMessageSessionFactory& _messageSessionFactory;
@@ -102,7 +103,7 @@ public:
void processMessage(api::VisitorId visitorId, const std::shared_ptr<api::StorageMessage>& msg);
void shutdown();
- void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks = time; }
+ void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); }
void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor);
/** For unit tests needing to pause thread. */