diff options
25 files changed, 145 insertions, 782 deletions
diff --git a/document/src/vespa/document/util/queue.h b/document/src/vespa/document/util/queue.h index 7e3c98333c8..770acda792d 100644 --- a/document/src/vespa/document/util/queue.h +++ b/document/src/vespa/document/util/queue.h @@ -2,7 +2,7 @@ #pragma once #include <queue> -#include <vespa/vespalib/util/sync.h> +#include <mutex> #define UNUSED_PARAM(p) namespace document { @@ -16,35 +16,32 @@ class Semaphore private: int _count; int _numWaiters; - vespalib::Monitor _sync; - - // assignment would be unsafe - Semaphore& operator= (const Semaphore& other); + std::mutex _lock; + std::condition_variable _cond; public: - // XXX is it really safe to just copy other._count here? - Semaphore(const Semaphore& other) : _count(other._count), _numWaiters(0), _sync() {} - - Semaphore(int count=0) : _count(count), _numWaiters(0), _sync() { } + Semaphore(int count=0) : _count(count), _numWaiters(0), _lock() { } - virtual ~Semaphore() { + ~Semaphore() { // XXX alternative: assert(_numWaiters == 0) while (true) { - vespalib::MonitorGuard guard(_sync); - if (_numWaiters == 0) break; - _count++; - guard.signal(); + { + std::lock_guard guard(_lock); + if (_numWaiters == 0) break; + _count++; + } + _cond.notify_one(); } } bool wait(int ms) { bool gotSemaphore = false; - vespalib::MonitorGuard guard(_sync); + std::unique_lock guard(_lock); if (_count == 0) { _numWaiters++; // we could retry if we get a signal but not the semaphore, // but then we risk waiting longer than expected, so // just ignore the return value here. - guard.wait(ms); + _cond.wait_for(guard, std::chrono::milliseconds(ms)); _numWaiters--; } if (_count > 0) { @@ -56,10 +53,10 @@ public: } bool wait() { - vespalib::MonitorGuard guard(_sync); + std::unique_lock guard(_lock); while (_count == 0) { _numWaiters++; - guard.wait(); + _cond.wait(guard); _numWaiters--; } _count--; @@ -68,11 +65,11 @@ public: } void post() { - vespalib::MonitorGuard guard(_sync); + std::unique_lock guard(_lock); assert(_count >= 0); _count++; if (_numWaiters > 0) { - guard.signal(); + _cond.notify_one(); } } }; @@ -82,12 +79,12 @@ template <typename T, typename Q=std::queue<T> > class QueueBase { public: - QueueBase() : _cond(), _count(0), _q() { } + QueueBase() : _lock(), _count(0), _q() { } virtual ~QueueBase() { } size_t size() const { return internal_size(); } bool empty() const { return size() == 0; } protected: - vespalib::Monitor _cond; + std::mutex _lock; document::Semaphore _count; Q _q; @@ -119,7 +116,7 @@ public: (void)timeout; bool retval; { - vespalib::MonitorGuard guard(this->_cond); + std::lock_guard guard(this->_lock); retval = this->internal_push(msg); } this->_count.post(); @@ -131,77 +128,12 @@ public: this->_count.wait() : this->_count.wait(timeout)); if ( retval ) { - vespalib::MonitorGuard guard(this->_cond); + std::lock_guard guard(this->_lock); retval = this->internal_pop(msg); } return retval; } }; -template <typename T, typename Q=std::queue<T> > -class QueueWithMax : public QueueBase<T, Q> -{ -protected: - size_t _size; - size_t storesize() const { return _size; } - virtual void add(const T& UNUSED_PARAM(msg)) { _size++; } - virtual void sub(const T& UNUSED_PARAM(msg)) { _size--; } -private: - size_t _max; - size_t _lowWaterMark; - int _writersWaiting; -public: - QueueWithMax(size_t max_=1000, size_t lowWaterMark_=500) - : QueueBase<T, Q>(), - _size(0), - _max(max_), - _lowWaterMark(lowWaterMark_), - _writersWaiting(0) - { } - bool push(const T& msg, int timeout=-1) - { - bool retval=true; - { - vespalib::MonitorGuard guard(this->_cond); - if (storesize() >= _max) { - ++_writersWaiting; - if (timeout >= 0) { - retval = guard.wait(timeout); - } else { - guard.wait(); - } - --_writersWaiting; - } - if (retval) { - retval = internal_push(msg); - } - if (retval) { - add(msg); - } - } - if (retval) { - this->_count.post(); - } - return retval; - } - bool pop(T& msg, int timeout=-1) - { - bool retval((timeout == -1) ? - this->_count.wait() : - this->_count.wait(timeout)); - if ( retval ) { - vespalib::MonitorGuard guard(this->_cond); - retval = internal_pop(msg); - if (retval) { - sub(msg); - if (_writersWaiting > 0 && storesize() < _lowWaterMark) { - guard.signal(); - } - } - } - return retval; - } -}; - } // namespace document diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 4579f7dec0e..9623fa59acf 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "messenger.h" -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/gate.h> #include <vespa/log/log.h> @@ -157,22 +156,23 @@ public: namespace mbus { Messenger::Messenger(bool skip_request_thread, bool skip_reply_thread) : - _monitor(), - _pool(128000), - _children(), - _queue(), - _closed(false), - _skip_request_thread(skip_request_thread), - _skip_reply_thread(skip_reply_thread) + _lock(), + _pool(128000), + _children(), + _queue(), + _closed(false), + _skip_request_thread(skip_request_thread), + _skip_reply_thread(skip_reply_thread) {} Messenger::~Messenger() { { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); _closed = true; - guard.broadcast(); } + _cond.notify_all(); + _pool.Close(); std::for_each(_children.begin(), _children.end(), DeleteFunctor<ITask>()); if ( ! _queue.empty()) { @@ -194,12 +194,12 @@ Messenger::Run(FastOS_ThreadInterface *thread, void *arg) while (true) { ITask::UP task; { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); if (_closed) { break; } if (_queue.empty()) { - guard.wait(100); + _cond.wait_for(guard, 100ms); } if (!_queue.empty()) { task.reset(_queue.front()); @@ -237,7 +237,7 @@ Messenger::discardRecurrentTasks() bool Messenger::start() { - if (_pool.NewThread(this) == 0) { + if (_pool.NewThread(this) == nullptr) { return false; } return true; @@ -266,11 +266,12 @@ Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler) void Messenger::enqueue(ITask::UP task) { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); if (!_closed) { _queue.push(task.release()); if (_queue.size() == 1) { - guard.signal(); + guard.unlock(); + _cond.notify_one(); } } } @@ -286,7 +287,7 @@ Messenger::sync() bool Messenger::isEmpty() const { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); return _queue.empty(); } diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h index 3103e9afae1..7ca3749b970 100644 --- a/messagebus/src/vespa/messagebus/messenger.h +++ b/messagebus/src/vespa/messagebus/messenger.h @@ -6,7 +6,6 @@ #include "message.h" #include "reply.h" #include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/fastos/thread.h> @@ -40,8 +39,9 @@ public: }; private: - vespalib::Monitor _monitor; - FastOS_ThreadPool _pool; + mutable std::mutex _lock; + std::condition_variable _cond; + FastOS_ThreadPool _pool; std::vector<ITask*> _children; vespalib::ArrayQueue<ITask*> _queue; bool _closed; diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index ea21010e21c..b91ba43f036 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -29,11 +29,11 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler) ResolveState state = _state.load(std::memory_order_acquire); bool hasVersion = (state == VERSION_RESOLVED); if ( ! hasVersion ) { - vespalib::MonitorGuard guard(_lock); + std::unique_lock guard(_lock); state = _state.load(std::memory_order_relaxed); if (state == VERSION_RESOLVED || state == PROCESSING_HANDLERS) { while (_state.load(std::memory_order::memory_order_relaxed) == PROCESSING_HANDLERS) { - guard.wait(); + _cond.wait(guard); } hasVersion = true; } else { @@ -71,7 +71,7 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) { HandlerList handlers; { - vespalib::MonitorGuard guard(_lock); + std::lock_guard guard(_lock); assert(_state == TARGET_INVOKED); if (req->CheckReturnTypes("s")) { FRT_Values &val = *req->GetReturn(); @@ -90,10 +90,10 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) handler->handleVersion(_version.get()); } { - vespalib::MonitorGuard guard(_lock); + std::lock_guard guard(_lock); _state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED); - guard.broadcast(); } + _cond.notify_all(); req->SubRef(); } diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index d927292f26d..6a57bd983e7 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -5,7 +5,6 @@ #include <vespa/fnet/frt/invoker.h> #include <vespa/fnet/frt/target.h> #include <vespa/vespalib/component/version.h> -#include <vespa/vespalib/util/sync.h> namespace mbus { @@ -27,7 +26,7 @@ public: /** * Virtual destructor required for inheritance. */ - virtual ~IVersionHandler() { } + virtual ~IVersionHandler() = default; /** * This method is invoked once the version of the corresponding {@link @@ -50,7 +49,8 @@ private: }; typedef std::unique_ptr<vespalib::Version> Version_UP; - vespalib::Monitor _lock; + std::mutex _lock; + std::condition_variable _cond; FRT_Supervisor &_orb; string _name; FRT_Target &_target; diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp index 494ee11c7cc..9ec6d36f688 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.cpp +++ b/messagebus/src/vespa/messagebus/routablequeue.cpp @@ -7,7 +7,7 @@ using namespace std::chrono; namespace mbus { RoutableQueue::RoutableQueue() - : _monitor(), + : _lock(), _queue() { } @@ -23,18 +23,19 @@ RoutableQueue::~RoutableQueue() uint32_t RoutableQueue::size() { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); return _queue.size(); } void RoutableQueue::enqueue(Routable::UP r) { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); _queue.push(r.get()); r.release(); if (_queue.size() == 1) { - guard.broadcast(); // support multiple readers + guard.unlock(); + _cond.notify_all(); // support multiple readers } } @@ -43,9 +44,9 @@ RoutableQueue::dequeue(duration timeout) { steady_clock::time_point startTime = steady_clock::now(); duration left = timeout; - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); while (_queue.size() == 0 && left > duration::zero()) { - if (!guard.wait(left) || _queue.size() > 0) { + if ((_cond.wait_for(guard, left) == std::cv_status::no_timeout) || (_queue.size() > 0)) { break; } duration elapsed = (steady_clock::now() - startTime); diff --git a/messagebus/src/vespa/messagebus/routablequeue.h b/messagebus/src/vespa/messagebus/routablequeue.h index 03f81a7a8d3..0c3edc1a597 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.h +++ b/messagebus/src/vespa/messagebus/routablequeue.h @@ -2,13 +2,14 @@ #pragma once -#include <vespa/vespalib/util/sync.h> #include "imessagehandler.h" #include "ireplyhandler.h" #include "queue.h" #include "routable.h" #include "message.h" #include "reply.h" +#include <mutex> +#include <condition_variable> namespace mbus { @@ -25,8 +26,9 @@ class RoutableQueue : public IMessageHandler, public IReplyHandler { private: - vespalib::Monitor _monitor; - Queue<Routable*> _queue; + std::mutex _lock; + std::condition_variable _cond; + Queue<Routable*> _queue; public: RoutableQueue(const RoutableQueue &) = delete; diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index eece44a922a..b22f684b848 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -13,7 +13,7 @@ using vespalib::make_string; namespace mbus { SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams ¶ms) - : _monitor(), + : _lock(), _mbus(mbus), _gate(new ReplyGate(_mbus)), _sequencer(*_gate), @@ -76,17 +76,17 @@ SourceSession::send(Message::UP msg) msg->setTimeRemaining(_timeout); } { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); if (_closed) { return Result(Error(ErrorCode::SEND_QUEUE_CLOSED, "Source session is closed."), std::move(msg)); } - if (_throttlePolicy.get() != nullptr && !_throttlePolicy->canSend(*msg, _pendingCount)) { + if (_throttlePolicy && !_throttlePolicy->canSend(*msg, _pendingCount)) { return Result(Error(ErrorCode::SEND_QUEUE_FULL, make_string("Too much pending data (%d messages).", _pendingCount)), std::move(msg)); } msg->pushHandler(_replyHandler); - if (_throttlePolicy.get() != nullptr) { + if (_throttlePolicy) { _throttlePolicy->processMessage(*msg); } ++_pendingCount; @@ -106,10 +106,10 @@ SourceSession::handleReply(Reply::UP reply) { bool done; { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); assert(_pendingCount > 0); --_pendingCount; - if (_throttlePolicy.get() != nullptr) { + if (_throttlePolicy) { _throttlePolicy->processReply(*reply); } done = (_closed && _pendingCount == 0); @@ -121,31 +121,33 @@ SourceSession::handleReply(Reply::UP reply) IReplyHandler &handler = reply->getCallStack().pop(*reply); handler.handleReply(std::move(reply)); if (done) { - vespalib::MonitorGuard guard(_monitor); - assert(_pendingCount == 0); - assert(_closed); - _done = true; - guard.broadcast(); + { + std::lock_guard guard(_lock); + assert(_pendingCount == 0); + assert(_closed); + _done = true; + } + _cond.notify_all(); } } void SourceSession::close() { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); _closed = true; if (_pendingCount == 0) { _done = true; } while (!_done) { - guard.wait(); + _cond.wait(guard); } } SourceSession & SourceSession::setTimeout(duration timeout) { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); _timeout = timeout; return *this; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h index 0992a3e377b..c7dfcdf9337 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.h +++ b/messagebus/src/vespa/messagebus/sourcesession.h @@ -1,11 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/vespalib/util/sync.h> #include "ireplyhandler.h" #include "result.h" #include "sequencer.h" #include "sourcesessionparams.h" +#include <condition_variable> namespace mbus { @@ -21,7 +21,8 @@ class SourceSession : public IReplyHandler { private: friend class MessageBus; - vespalib::Monitor _monitor; + std::mutex _lock; + std::condition_variable _cond; MessageBus &_mbus; ReplyGate *_gate; Sequencer _sequencer; diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp index 01d644bba09..a8199938a25 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp +++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp @@ -12,27 +12,27 @@ Receptor::~Receptor() = default; void Receptor::handleMessage(Message::UP msg) { - vespalib::MonitorGuard guard(_mon); + std::lock_guard guard(_mon); _msg = std::move(msg); - guard.broadcast(); + _cond.notify_all(); } void Receptor::handleReply(Reply::UP reply) { - vespalib::MonitorGuard guard(_mon); + std::lock_guard guard(_mon); _reply = std::move(reply); - guard.broadcast(); + _cond.notify_all(); } Message::UP Receptor::getMessage(duration maxWait) { steady_clock::time_point startTime = steady_clock::now(); - vespalib::MonitorGuard guard(_mon); - while (_msg.get() == 0) { + std::unique_lock guard(_mon); + while ( ! _msg) { duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); - if (w <= duration::zero() || !guard.wait(w)) { + if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) { break; } } @@ -43,10 +43,10 @@ Reply::UP Receptor::getReply(duration maxWait) { steady_clock::time_point startTime = steady_clock::now(); - vespalib::MonitorGuard guard(_mon); + std::unique_lock guard(_mon); while (_reply.get() == 0) { duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); - if (w <= duration::zero() || !guard.wait(w)) { + if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) { break; } } diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h index 1d98ac62cd2..4e734319ca0 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.h +++ b/messagebus/src/vespa/messagebus/testlib/receptor.h @@ -6,7 +6,7 @@ #include <vespa/messagebus/ireplyhandler.h> #include <vespa/messagebus/message.h> #include <vespa/messagebus/reply.h> -#include <vespa/vespalib/util/sync.h> +#include <condition_variable> namespace mbus { @@ -14,15 +14,13 @@ class Receptor : public IMessageHandler, public IReplyHandler { private: - vespalib::Monitor _mon; - Message::UP _msg; - Reply::UP _reply; - - Receptor(const Receptor &); - Receptor &operator=(const Receptor &); + std::mutex _mon; + std::condition_variable _cond; + Message::UP _msg; + Reply::UP _reply; public: Receptor(); - ~Receptor(); + ~Receptor() override; void handleMessage(Message::UP msg) override; void handleReply(Reply::UP reply) override; Message::UP getMessage(duration maxWait = 120s); diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 3285e03db67..54464105bb3 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -329,7 +329,7 @@ DummyPersistence::getPartitionStates() const { _initialized = true; LOG(debug, "getPartitionStates()"); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); return PartitionStateListResult(_partitions); } @@ -344,7 +344,7 @@ DummyPersistence::listBuckets(BucketSpace bucketSpace, PartitionId id) const { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "listBuckets(%u)", uint16_t(id)); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); BucketIdListResult::List list; if (bucketSpace == FixedBucketSpaces::default_space()) { for (PartitionContent::const_iterator it = _content[id].begin(); @@ -359,14 +359,14 @@ DummyPersistence::listBuckets(BucketSpace bucketSpace, PartitionId id) const void DummyPersistence::setModifiedBuckets(const BucketIdListResult::List& buckets) { - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); _modifiedBuckets = buckets; } BucketIdListResult DummyPersistence::getModifiedBuckets(BucketSpace bucketSpace) const { - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); if (bucketSpace == FixedBucketSpaces::default_space()) { return BucketIdListResult(_modifiedBuckets); } else { @@ -378,7 +378,7 @@ DummyPersistence::getModifiedBuckets(BucketSpace bucketSpace) const Result DummyPersistence::setClusterState(BucketSpace bucketSpace, const ClusterState& c) { - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); if (bucketSpace == FixedBucketSpaces::default_space()) { _clusterState.reset(new ClusterState(c)); if (!_clusterState->nodeUp()) { @@ -570,7 +570,7 @@ DummyPersistence::createIterator(const Bucket &b, FieldSetSP fs, const Selection Iterator* it; IteratorId id; { - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); id = _nextIterator; ++_nextIterator; assert(_iterators.find(id) == _iterators.end()); @@ -640,7 +640,7 @@ DummyPersistence::iterate(IteratorId id, uint64_t maxByteSize, Context& ctx) con ctx.trace(9, "started iterate()"); Iterator* it; { - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); std::map<IteratorId, Iterator::UP>::iterator iter(_iterators.find(id)); if (iter == _iterators.end()) { return IterateResult(Result::ErrorType::PERMANENT_ERROR, @@ -711,7 +711,7 @@ DummyPersistence::destroyIterator(IteratorId id, Context&) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "destroyIterator(%" PRIu64 ")", uint64_t(id)); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); if (_iterators.find(id) != _iterators.end()) { _iterators.erase(id); } @@ -724,7 +724,7 @@ DummyPersistence::createBucket(const Bucket& b, Context&) DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "createBucket(%s)", b.toString().c_str()); assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); if (_content[b.getPartition()].find(b) == _content[b.getPartition()].end()) { _content[b.getPartition()][b] = std::make_shared<BucketContent>(); } else { @@ -740,7 +740,7 @@ DummyPersistence::deleteBucket(const Bucket& b, Context&) DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "deleteBucket(%s)", b.toString().c_str()); assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); if (_content[b.getPartition()][b].get()) { assert(!_content[b.getPartition()][b]->_inUse); } @@ -904,7 +904,7 @@ DummyPersistence::dumpBucket(const Bucket& b) const DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(spam, "dumpBucket(%s)", b.toString().c_str()); assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); PartitionContent::const_iterator it(_content[b.getPartition()].find(b)); if (it == _content[b.getPartition()].end()) { return "DOESN'T EXIST"; @@ -924,7 +924,7 @@ DummyPersistence::isActive(const Bucket& b) const { DUMMYPERSISTENCE_VERIFY_INITIALIZED; assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); LOG(spam, "isActive(%s)", b.toString().c_str()); PartitionContent::const_iterator it(_content[b.getPartition()].find(b)); if (it == _content[b.getPartition()].end()) { @@ -942,7 +942,7 @@ BucketContentGuard::UP DummyPersistence::acquireBucketWithLock(const Bucket& b, LockMode lock_mode) const { assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - vespalib::MonitorGuard lock(_monitor); + std::lock_guard lock(_monitor); DummyPersistence& ncp(const_cast<DummyPersistence&>(*this)); PartitionContent::iterator it(ncp._content[b.getPartition()].find(b)); if (it == ncp._content[b.getPartition()].end()) { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c3a4991a590..5f9d2b6ddc3 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -11,10 +11,11 @@ #include <vespa/persistence/spi/abstractpersistenceprovider.h> #include <vespa/document/base/globalid.h> #include <vespa/document/fieldset/fieldsets.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/stllike/hash_map.h> #include <atomic> #include <map> +#include <mutex> +#include <condition_variable> namespace document { class DocumentTypeRepo; @@ -207,7 +208,8 @@ private: std::vector<PartitionContent> _content; IteratorId _nextIterator; mutable std::map<IteratorId, Iterator::UP> _iterators; - vespalib::Monitor _monitor; + mutable std::mutex _monitor; + std::condition_variable _cond; std::unique_ptr<ClusterState> _clusterState; diff --git a/storage/src/tests/common/dummystoragelink.cpp b/storage/src/tests/common/dummystoragelink.cpp index ab70bff3409..8d188002c67 100644 --- a/storage/src/tests/common/dummystoragelink.cpp +++ b/storage/src/tests/common/dummystoragelink.cpp @@ -63,12 +63,12 @@ bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd) } } if (isBottom()) { - vespalib::MonitorGuard lock(_waitMonitor); + std::lock_guard lock(_waitMonitor); { std::lock_guard guard(_lock); _commands.push_back(cmd); } - lock.broadcast(); + _waitCond.notify_all(); return true; } return StorageLink::onDown(cmd); @@ -76,12 +76,12 @@ bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd) bool DummyStorageLink::onUp(const api::StorageMessage::SP& reply) { if (isTop()) { - vespalib::MonitorGuard lock(_waitMonitor); + std::lock_guard lock(_waitMonitor); { std::lock_guard guard(_lock); _replies.push_back(reply); } - lock.broadcast(); + _waitCond.notify_all(); return true; } return StorageLink::onUp(reply); @@ -96,7 +96,7 @@ void DummyStorageLink::injectReply(api::StorageReply* reply) } void DummyStorageLink::reset() { - vespalib::MonitorGuard lock(_waitMonitor); + std::lock_guard lock(_waitMonitor); std::lock_guard guard(_lock); _commands.clear(); _replies.clear(); @@ -106,11 +106,10 @@ void DummyStorageLink::reset() { void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout) { framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime( - clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000)); - vespalib::MonitorGuard lock(_waitMonitor); + vespalib::steady_time endTime = clock.getMonotonicTime() + vespalib::from_s(timeout); + std::unique_lock guard(_waitMonitor); while (_commands.size() + _replies.size() < msgCount) { - if (timeout != 0 && clock.getTimeInMillis() > endTime) { + if (timeout != 0 && clock.getMonotonicTime() > endTime) { std::ostringstream ost; ost << "Timed out waiting for " << msgCount << " messages to " << "arrive in dummy storage link. Only " @@ -119,9 +118,9 @@ void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout) throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); } if (timeout >= 0) { - lock.wait((endTime - clock.getTimeInMillis()).getTime()); + _waitCond.wait_until(guard, endTime); } else { - lock.wait(); + _waitCond.wait(guard); } } } @@ -129,9 +128,8 @@ void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout) void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout) { framework::defaultimplementation::RealClock clock; - framework::MilliSecTime endTime( - clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000)); - vespalib::MonitorGuard lock(_waitMonitor); + vespalib::steady_time endTime = clock.getMonotonicTime() + vespalib::from_s(timeout); + std::unique_lock lock(_waitMonitor); while (true) { for (uint32_t i=0; i<_commands.size(); ++i) { if (_commands[i]->getType() == type) return; @@ -139,7 +137,7 @@ void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout) for (uint32_t i=0; i<_replies.size(); ++i) { if (_replies[i]->getType() == type) return; } - if (timeout != 0 && clock.getTimeInMillis() > endTime) { + if (timeout != 0 && clock.getMonotonicTime() > endTime) { std::ostringstream ost; ost << "Timed out waiting for " << type << " message to " << "arrive in dummy storage link. Only " @@ -154,9 +152,9 @@ void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout) throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); } if (timeout >= 0) { - lock.wait((endTime - clock.getTimeInMillis()).getTime()); + _waitCond.wait_until(lock, endTime); } else { - lock.wait(); + _waitCond.wait(lock); } } } @@ -164,7 +162,7 @@ void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout) api::StorageMessage::SP DummyStorageLink::getAndRemoveMessage(const api::MessageType& type) { - vespalib::MonitorGuard lock(_waitMonitor); + std::lock_guard lock(_waitMonitor); for (std::vector<api::StorageMessage::SP>::iterator it = _commands.begin(); it != _commands.end(); ++it) { diff --git a/storage/src/tests/common/dummystoragelink.h b/storage/src/tests/common/dummystoragelink.h index 46a1d4ea25f..5a0f1ad96b9 100644 --- a/storage/src/tests/common/dummystoragelink.h +++ b/storage/src/tests/common/dummystoragelink.h @@ -2,7 +2,6 @@ #pragma once -#include <vespa/vespalib/util/sync.h> #include <list> #include <sstream> #include <vespa/storageapi/messageapi/storagecommand.h> @@ -27,7 +26,8 @@ class DummyStorageLink : public StorageLink { bool _useDispatch; bool _ignore; static DummyStorageLink* _last; - vespalib::Monitor _waitMonitor; + std::mutex _waitMonitor; + std::condition_variable _waitCond; public: DummyStorageLink(); @@ -87,7 +87,7 @@ public: { return _replies; } std::vector<api::StorageMessage::SP> getCommandsOnce() { - vespalib::MonitorGuard lock(_waitMonitor); + std::lock_guard lock(_waitMonitor); std::vector<api::StorageMessage::SP> retval; { std::lock_guard guard(_lock); @@ -97,7 +97,7 @@ public: } std::vector<api::StorageMessage::SP> getRepliesOnce() { - vespalib::MonitorGuard lock(_waitMonitor); + std::lock_guard lock(_waitMonitor); std::vector<api::StorageMessage::SP> retval; { std::lock_guard guard(_lock); diff --git a/storageserver/src/apps/storaged/storage.cpp b/storageserver/src/apps/storaged/storage.cpp index 903c61875ed..428067e3059 100644 --- a/storageserver/src/apps/storaged/storage.cpp +++ b/storageserver/src/apps/storaged/storage.cpp @@ -51,12 +51,13 @@ Process::UP createProcess(vespalib::stringref configId) { class StorageApp : public FastOS_Application, private vespalib::ProgramOptions { - std::string _configId; - bool _showSyntax; - uint32_t _maxShutdownTime; - int _lastSignal; - vespalib::Monitor _signalLock; - Process::UP _process; + std::string _configId; + bool _showSyntax; + uint32_t _maxShutdownTime; + int _lastSignal; + std::mutex _signalLock; + std::condition_variable _signalCond; + Process::UP _process; public: StorageApp(); @@ -64,11 +65,10 @@ public: void handleSignal(int signal) { LOG(info, "Got signal %d, waiting for lock", signal); - vespalib::MonitorGuard sync(_signalLock); - + std::lock_guard sync(_signalLock); LOG(info, "Got lock for signal %d", signal); _lastSignal = signal; - sync.signal(); + _signalCond.notify_one(); } void handleSignals(); @@ -103,8 +103,7 @@ StorageApp::~StorageApp() = default; bool StorageApp::Init() { FastOS_Application::Init(); - setCommandLineArguments( - FastOS_Application::_argc, FastOS_Application::_argv); + setCommandLineArguments(FastOS_Application::_argc, FastOS_Application::_argv); try{ parse(); } catch (vespalib::InvalidCommandLineArgumentsException& e) { @@ -192,9 +191,9 @@ int StorageApp::Main() ResumeGuard guard(_process->getNode().pause()); _process->updateConfig(); } - // Wait until we get a kill signal. - vespalib::MonitorGuard lock(_signalLock); - lock.wait(1000ms); + // Wait until we get a kill signal. + std::unique_lock guard(_signalLock); + _signalCond.wait_for(guard, 1000ms); handleSignals(); } LOG(debug, "Server was attempted stopped, shutting down"); diff --git a/vdslib/CMakeLists.txt b/vdslib/CMakeLists.txt index 3c1ee756e56..b66f53a4e19 100644 --- a/vdslib/CMakeLists.txt +++ b/vdslib/CMakeLists.txt @@ -14,7 +14,6 @@ vespa_define_module( src/vespa/vdslib/container src/vespa/vdslib/distribution src/vespa/vdslib/state - src/vespa/vdslib/thread TEST_DEPENDS vdstestlib @@ -24,5 +23,4 @@ vespa_define_module( src/tests/container src/tests/distribution src/tests/state - src/tests/thread ) diff --git a/vdslib/src/tests/CMakeLists.txt b/vdslib/src/tests/CMakeLists.txt index 6cf1ba5e33f..0b48d675ddc 100644 --- a/vdslib/src/tests/CMakeLists.txt +++ b/vdslib/src/tests/CMakeLists.txt @@ -9,7 +9,6 @@ vespa_add_executable(vdslib_gtest_runner_app TEST vdslib_containertest vdslib_testdistribution vdslib_teststate - vdslib_testthread GTest::GTest ) diff --git a/vdslib/src/tests/thread/.gitignore b/vdslib/src/tests/thread/.gitignore deleted file mode 100644 index 583460ae288..00000000000 --- a/vdslib/src/tests/thread/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.So -.depend -Makefile diff --git a/vdslib/src/tests/thread/CMakeLists.txt b/vdslib/src/tests/thread/CMakeLists.txt deleted file mode 100644 index bf2c8a41c9b..00000000000 --- a/vdslib/src/tests/thread/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(vdslib_testthread - SOURCES - taskschedulertest.cpp - DEPENDS - vdslib - GTest::GTest -) diff --git a/vdslib/src/tests/thread/taskschedulertest.cpp b/vdslib/src/tests/thread/taskschedulertest.cpp deleted file mode 100644 index 54877fae62b..00000000000 --- a/vdslib/src/tests/thread/taskschedulertest.cpp +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/vdslib/thread/taskscheduler.h> -#include <vespa/vespalib/gtest/gtest.h> -#include <thread> - -namespace vdslib { - -namespace { - -struct TestWatch : public TaskScheduler::Watch { - mutable std::mutex _lock; - uint64_t _time; - - TestWatch(uint64_t startTime = 0) : _time(startTime) {} - ~TestWatch() = default; - - TaskScheduler::Time getTime() const override { - std::lock_guard guard(_lock); - return _time; - } - - void increment(uint64_t ms) { - std::lock_guard guard(_lock); - _time += ms; - } - - void set(uint64_t ms) { - std::lock_guard guard(_lock); - _time = ms; - } -}; - -struct TestTask : public TaskScheduler::Task -{ - TestWatch& _watch; - uint64_t _executionTime; - uint64_t _maxRuns; - uint64_t _maxTime; - int64_t _result; - uint64_t _currentRuns; - std::string _name; - std::vector<std::string>* _register; - - TestTask(TestWatch& watch, uint64_t executionTime, uint64_t maxRuns, - uint64_t maxTime, int64_t result) - : _watch(watch), _executionTime(executionTime), _maxRuns(maxRuns), - _maxTime(maxTime), _result(result), _currentRuns(0), - _name(), _register(0) - { - } - - void registerCallsWithName(const std::string& name, - std::vector<std::string>& myregister) - { - _name = name; - _register = &myregister; - } - - int64_t run(TaskScheduler::Time currentTime) override { - // Emulate that we use time to run - _watch.increment(_executionTime); - if (_register != 0) { - std::ostringstream ost; - ost << currentTime; - if (_name.size() > 0) { - ost << " " << _name; - } - _register->push_back(ost.str()); - } - // If max runs, dont run anymore - if (++_currentRuns >= _maxRuns) { - //std::cerr << "Max runs run, returning 0\n"; - return 0; - } - // If we will go beyond max time, dont run anymore - if (_result > 0 && currentTime + _result > _maxTime) { - //std::cerr << "Max time spent, returning 0\n"; - return 0; - } - //std::cerr << "Executed test task. Returning " << _result << "\n"; - return _result; - } - -}; - -std::string join(std::vector<std::string>& v) { - std::ostringstream ost; - for (size_t i=0; i<v.size(); ++i) { - if (i != 0) ost << ","; - ost << v[i]; - } - return ost.str(); -} - -} - -TEST(TaskSchedulerTest, test_simple) -{ - FastOS_ThreadPool threadPool(128 * 1024); - TestWatch watch(0); - TaskScheduler scheduler; - scheduler.setWatch(watch); - scheduler.start(threadPool); - std::vector<std::string> calls; - - // Test that one can schedule a single task immediately - { - calls.clear(); - watch.set(0); - uint64_t counter = scheduler.getTaskCounter(); - TestTask* task(new TestTask(watch, 10, 5, 1000, 0)); - task->registerCallsWithName("", calls); - scheduler.add(TestTask::UP(task)); - scheduler.waitForTaskCounterOfAtLeast(counter + 1); - EXPECT_EQ(std::string("0"), join(calls)); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - } - // Test that task is repeated at intervals if wanted. - { - calls.clear(); - watch.set(0); - uint64_t counter = scheduler.getTaskCounter(); - TestTask* task(new TestTask(watch, 10, 5, 1000, -20)); - task->registerCallsWithName("", calls); - scheduler.add(TestTask::UP(task)); - for (uint32_t i = 1; i <= 5; ++i) { - scheduler.waitForTaskCounterOfAtLeast(counter + i); - watch.increment(100); - } - EXPECT_EQ(std::string("0,110,220,330,440"), - join(calls)); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - } - // Test that task scheduled at specific time works, and that if - // scheduled at specific time in the past/current, we're rerun at once. - { - calls.clear(); - watch.set(0); - uint64_t counter = scheduler.getTaskCounter(); - TestTask* task(new TestTask(watch, 10, 4, 1000, 100)); - task->registerCallsWithName("", calls); - scheduler.addAbsolute(TestTask::UP(task), 50); - watch.increment(49); // Not yet time to run - std::this_thread::sleep_for(5ms); - // Check that it has not run yet.. - EXPECT_EQ(counter, scheduler.getTaskCounter()); - watch.increment(10); // Now time is enough for it to run - scheduler.waitForTaskCounterOfAtLeast(counter + 1); - watch.increment(10); - std::this_thread::sleep_for(5ms); - // Check that it has not run yet.. - EXPECT_EQ(counter + 1, scheduler.getTaskCounter()); - watch.increment(50); - scheduler.waitForTaskCounterOfAtLeast(counter + 2); - EXPECT_EQ(std::string("59,129,129,129"), - join(calls)); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - } -} - -TEST(TaskSchedulerTest, test_multiple_tasks_at_same_time) -{ - FastOS_ThreadPool threadPool(128 * 1024); - TestWatch watch(0); - TaskScheduler scheduler; - scheduler.setWatch(watch); - std::vector<std::string> calls; - - // Test that tasks deleted before they are run are automatically - // cancelled and removed from scheduler - { - TestTask* task1(new TestTask(watch, 10, 3, 1000, 10)); - TestTask* task2(new TestTask(watch, 10, 3, 1000, 10)); - task1->registerCallsWithName("task1", calls); - task2->registerCallsWithName("task2", calls); - watch.set(10); - scheduler.add(TestTask::UP(task1)); - scheduler.add(TestTask::UP(task2)); - // Start threadpool after adding both, such that we ensure both - // are added at the same time interval - scheduler.start(threadPool); - - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - std::ostringstream ost; - for (size_t i=0; i<calls.size(); ++i) ost << calls[i] << "\n"; - - EXPECT_EQ(std::string( - "10 task1\n" - "10 task2\n" - "10 task1\n" - "10 task2\n" - "10 task1\n" - "10 task2\n" - ), ost.str()); - } -} - -TEST(TaskSchedulerTest, test_remove_task) -{ - FastOS_ThreadPool threadPool(128 * 1024); - TestWatch watch(0); - TaskScheduler scheduler; - scheduler.setWatch(watch); - scheduler.start(threadPool); - std::vector<std::string> calls; - - // Schedule a task, and remove it.. - { - calls.clear(); - watch.set(0); - TestTask* task(new TestTask(watch, 10, 5, 1000, 0)); - task->registerCallsWithName("", calls); - scheduler.addAbsolute(TestTask::UP(task), 50); - // Remove actual task - scheduler.remove(task); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - // Remove non-existing task - task = new TestTask(watch, 10, 5, 1000, 0); - scheduler.remove(task); - delete task; - // Time should not be advanced as task didn't get to run - EXPECT_EQ(0, (int) watch.getTime()); - } -} - -} diff --git a/vdslib/src/vespa/vdslib/CMakeLists.txt b/vdslib/src/vespa/vdslib/CMakeLists.txt index cf5053a5ceb..ea19664a45f 100644 --- a/vdslib/src/vespa/vdslib/CMakeLists.txt +++ b/vdslib/src/vespa/vdslib/CMakeLists.txt @@ -4,7 +4,6 @@ vespa_add_library(vdslib $<TARGET_OBJECTS:vdslib_container> $<TARGET_OBJECTS:vdslib_state> $<TARGET_OBJECTS:vdslib_distribution> - $<TARGET_OBJECTS:vdslib_thread> INSTALL lib64 DEPENDS ) diff --git a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt b/vdslib/src/vespa/vdslib/thread/CMakeLists.txt deleted file mode 100644 index 656772afc49..00000000000 --- a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(vdslib_thread OBJECT - SOURCES - taskscheduler.cpp - DEPENDS -) diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp b/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp deleted file mode 100644 index 08c7b80e406..00000000000 --- a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "taskscheduler.h" -#include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <sys/time.h> - -namespace vdslib { - -uint64_t -TaskScheduler::Watch::getTime() const -{ - struct timeval mytime; - gettimeofday(&mytime, 0); - return mytime.tv_sec * 1000llu + mytime.tv_usec / 1000; -} - -TaskScheduler::TaskScheduler() - : _lock(), - _defaultWatch(), - _watch(&_defaultWatch), - _tasks(), - _currentRunningTasks(), - _taskCounter(0) -{ -} - -bool TaskScheduler::onStop() -{ - vespalib::MonitorGuard guard(_lock); - guard.broadcast(); - return true; -} - -TaskScheduler::~TaskScheduler() -{ - stop(); - { - vespalib::MonitorGuard guard(_lock); - guard.broadcast(); - } - join(); - for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end(); ++it) { - TaskVector & v(it->second); - for (TaskVector::iterator it2 = v.begin(); it2 != v.end(); ++it2) { - delete *it2; - } - } -} - -void -TaskScheduler::add(Task::UP task) -{ - vespalib::MonitorGuard guard(_lock); - std::vector<Task*>& tasks(_tasks[_watch->getTime()]); - tasks.push_back(task.release()); - guard.broadcast(); -} - -void -TaskScheduler::addRelative(Task::UP task, Time timeDiff) -{ - vespalib::MonitorGuard guard(_lock); - std::vector<Task*>& tasks(_tasks[_watch->getTime() + timeDiff]); - tasks.push_back(task.release()); - guard.broadcast(); -} - -void -TaskScheduler::addAbsolute(Task::UP task, Time time) -{ - vespalib::MonitorGuard guard(_lock); - std::vector<Task*>& tasks(_tasks[time]); - tasks.push_back(task.release()); - guard.broadcast(); -} - -namespace { - template<typename T> - bool contains(const std::vector<T>& source, const T& element) { - for (size_t i = 0, n = source.size(); i<n; ++i) { - if (source[i] == element) return true; - } - return false; - } - - template<typename T> - void erase(std::vector<T>& source, const T& element) { - std::vector<T> result; - result.reserve(source.size()); - for (size_t i = 0, n = source.size(); i<n; ++i) { - if (source[i] != element) result.push_back(source[i]); - } - result.swap(source); - } -} - -void -TaskScheduler::remove(Task* task) -{ - vespalib::MonitorGuard guard(_lock); - while (contains(_currentRunningTasks, task)) { - guard.wait(); - } - for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end();) { - if (contains(it->second, task)) { - erase(it->second, task); - if (it->second.size() == 0) _tasks.erase(it); - delete task; - break; - } - ++it; - } -} - -void -TaskScheduler::setWatch(const Watch& watch) -{ - vespalib::MonitorGuard guard(_lock); - _watch = &watch; -} - -TaskScheduler::Time -TaskScheduler::getTime() const -{ - vespalib::MonitorGuard guard(_lock); - return _watch->getTime(); -} - -uint64_t -TaskScheduler::getTaskCounter() const -{ - vespalib::MonitorGuard guard(_lock); - return _taskCounter; -} - -void -TaskScheduler::waitForTaskCounterOfAtLeast(uint64_t taskCounter, - uint64_t timeout) const -{ - vespalib::MonitorGuard guard(_lock); - uint64_t currentTime = _defaultWatch.getTime(); - uint64_t endTime = currentTime + timeout; - while (_taskCounter < taskCounter) { - if (endTime <= currentTime) { - vespalib::asciistream ost; - ost << "Task scheduler not reached task counter of " << taskCounter - << " within timeout of " << timeout << " ms. Current task" - << " counter is " << _taskCounter; - throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); - } - guard.wait(endTime - currentTime); - currentTime = _defaultWatch.getTime(); - } -} - -void -TaskScheduler::waitUntilNoTasksRemaining(uint64_t timeout) const -{ - vespalib::MonitorGuard guard(_lock); - uint64_t currentTime = _defaultWatch.getTime(); - uint64_t endTime = currentTime + timeout; - while (_tasks.size() > 0 || _currentRunningTasks.size() > 0) { - if (endTime <= currentTime) { - vespalib::asciistream ost; - ost << "Task scheduler still have tasks scheduled after timeout" - << " of " << timeout << " ms. There are " << _tasks.size() - << " entries in tasks map and " << _currentRunningTasks.size() - << " tasks currently scheduled to run."; - throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); - } - guard.wait(endTime - currentTime); - currentTime = _defaultWatch.getTime(); - } -} - -void -TaskScheduler::run() -{ - while (1) { - vespalib::MonitorGuard guard(_lock); - if (!running()) return; - Time time = _watch->getTime(); - TaskMap::iterator next = _tasks.begin(); - if (next == _tasks.end()) { - guard.wait(); - continue; - } - if (next->first > time) { - guard.wait(next->first - time); - continue; - } - TaskVector taskList(next->second); - _currentRunningTasks.swap(next->second); - _tasks.erase(next); - guard.unlock(); - for (size_t i=0; i<taskList.size(); ++i) { - int64_t result = taskList[i]->run(time); - if (result < 0) { - addAbsolute(Task::UP(taskList[i]), - time + (-1 * result)); - } else if (result > 0) { - if (static_cast<Time>(result) <= time) { - taskList.push_back(taskList[i]); - } else { - addAbsolute(Task::UP(taskList[i]), result); - } - } else { - delete taskList[i]; - } - } - vespalib::MonitorGuard guard2(_lock); - if (!running()) return; - _taskCounter += _currentRunningTasks.size(); - _currentRunningTasks.clear(); - guard2.broadcast(); - } -} - -} // vdslib diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.h b/vdslib/src/vespa/vdslib/thread/taskscheduler.h deleted file mode 100644 index b34d633e624..00000000000 --- a/vdslib/src/vespa/vdslib/thread/taskscheduler.h +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class vespalib::TaskScheduler - * \ingroup util - * - * \brief Class to register tasks in to get them run in a separate thread. - * - * Imported to vdslib as C++ document API needs an independent thread to run - * events in, as it was subject to errors to use FNET event thread. Converted - * this class used in storage API client code before. - */ -#pragma once - -#include <vespa/vespalib/util/document_runnable.h> -#include <map> -#include <memory> -#include <vector> -#include <vespa/vespalib/util/sync.h> - -namespace vdslib { - -class TaskScheduler : public document::Runnable -{ -public: - typedef uint64_t Time; - - struct Task { - typedef std::unique_ptr<Task> UP; - virtual ~Task() {} - /** - * Return 0 to unregister this task. Return a negative number to get a - * new callback in that many (times -1) milliseconds. Return a positive - * number to get a callback as soon as thread is available after that - * absolute point in time (in milliseconds). If returning current time - * or before, this task will be scheduled to be rerun immediately - * (after other already waiting tasks have had a chance to run). - * The current time for the scheduler is given to the task. - */ - virtual int64_t run(Time) = 0; - }; - - /** - * If you want to fake time (useful for testing), implement your own watch - * for the scheduler to use. - */ - struct Watch { - virtual ~Watch() {} - virtual Time getTime() const; // In ms since 1970 - }; - - /** Creates a task scheduler. Remember to call start() to get it going. */ - TaskScheduler(); - ~TaskScheduler(); - - /** Register a task for immediate execution */ - void add(Task::UP); - - /** Register a task to be run in a given number of milliseconds from now */ - void addRelative(Task::UP, Time); - - /** Register a task to be run at given absolute time in milliseconds */ - void addAbsolute(Task::UP, Time); - - /** - * Removes a scheduled task from the scheduler. Note that this is - * currently not efficiently implemented but an exhaustive iteration of - * current tasks. Assuming number of tasks is small so this doesn't matter. - * If current task is running while this is called, function will block - * until it has completed before removing it. (To be safe if you want to - * delete task after.) - */ - void remove(Task*); - - /** Get the schedulers current time. */ - Time getTime() const; - - /** Set a custom watch to be used for this scheduler (Useful for testing) */ - void setWatch(const Watch& watch); - - /** Returns a number of current task */ - uint64_t getTaskCounter() const; - - /** Wait until a given number of tasks have been completed */ - void waitForTaskCounterOfAtLeast(uint64_t taskCounter, - uint64_t timeout = 5000) const; - - /** Call this to wait until no tasks are scheduled (Useful for testing) */ - void waitUntilNoTasksRemaining(uint64_t timeout = 5000) const; - -private: - typedef std::vector<Task*> TaskVector; - typedef std::map<Time, TaskVector> TaskMap; - vespalib::Monitor _lock; - Watch _defaultWatch; - const Watch* _watch; - TaskMap _tasks; - TaskVector _currentRunningTasks; - uint64_t _taskCounter; - - void run() override; - bool onStop() override; -}; - -} // vdslib - |