summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--document/src/vespa/document/util/queue.h110
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp33
-rw-r--r--messagebus/src/vespa/messagebus/messenger.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.h6
-rw-r--r--messagebus/src/vespa/messagebus/routablequeue.cpp13
-rw-r--r--messagebus/src/vespa/messagebus/routablequeue.h8
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp30
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.h5
-rw-r--r--messagebus/src/vespa/messagebus/testlib/receptor.cpp18
-rw-r--r--messagebus/src/vespa/messagebus/testlib/receptor.h14
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp26
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h6
-rw-r--r--storage/src/tests/common/dummystoragelink.cpp34
-rw-r--r--storage/src/tests/common/dummystoragelink.h8
-rw-r--r--storageserver/src/apps/storaged/storage.cpp27
-rw-r--r--vdslib/CMakeLists.txt2
-rw-r--r--vdslib/src/tests/CMakeLists.txt1
-rw-r--r--vdslib/src/tests/thread/.gitignore3
-rw-r--r--vdslib/src/tests/thread/CMakeLists.txt8
-rw-r--r--vdslib/src/tests/thread/taskschedulertest.cpp227
-rw-r--r--vdslib/src/vespa/vdslib/CMakeLists.txt1
-rw-r--r--vdslib/src/vespa/vdslib/thread/CMakeLists.txt6
-rw-r--r--vdslib/src/vespa/vdslib/thread/taskscheduler.cpp220
-rw-r--r--vdslib/src/vespa/vdslib/thread/taskscheduler.h105
25 files changed, 145 insertions, 782 deletions
diff --git a/document/src/vespa/document/util/queue.h b/document/src/vespa/document/util/queue.h
index 7e3c98333c8..770acda792d 100644
--- a/document/src/vespa/document/util/queue.h
+++ b/document/src/vespa/document/util/queue.h
@@ -2,7 +2,7 @@
#pragma once
#include <queue>
-#include <vespa/vespalib/util/sync.h>
+#include <mutex>
#define UNUSED_PARAM(p)
namespace document {
@@ -16,35 +16,32 @@ class Semaphore
private:
int _count;
int _numWaiters;
- vespalib::Monitor _sync;
-
- // assignment would be unsafe
- Semaphore& operator= (const Semaphore& other);
+ std::mutex _lock;
+ std::condition_variable _cond;
public:
- // XXX is it really safe to just copy other._count here?
- Semaphore(const Semaphore& other) : _count(other._count), _numWaiters(0), _sync() {}
-
- Semaphore(int count=0) : _count(count), _numWaiters(0), _sync() { }
+ Semaphore(int count=0) : _count(count), _numWaiters(0), _lock() { }
- virtual ~Semaphore() {
+ ~Semaphore() {
// XXX alternative: assert(_numWaiters == 0)
while (true) {
- vespalib::MonitorGuard guard(_sync);
- if (_numWaiters == 0) break;
- _count++;
- guard.signal();
+ {
+ std::lock_guard guard(_lock);
+ if (_numWaiters == 0) break;
+ _count++;
+ }
+ _cond.notify_one();
}
}
bool wait(int ms) {
bool gotSemaphore = false;
- vespalib::MonitorGuard guard(_sync);
+ std::unique_lock guard(_lock);
if (_count == 0) {
_numWaiters++;
// we could retry if we get a signal but not the semaphore,
// but then we risk waiting longer than expected, so
// just ignore the return value here.
- guard.wait(ms);
+ _cond.wait_for(guard, std::chrono::milliseconds(ms));
_numWaiters--;
}
if (_count > 0) {
@@ -56,10 +53,10 @@ public:
}
bool wait() {
- vespalib::MonitorGuard guard(_sync);
+ std::unique_lock guard(_lock);
while (_count == 0) {
_numWaiters++;
- guard.wait();
+ _cond.wait(guard);
_numWaiters--;
}
_count--;
@@ -68,11 +65,11 @@ public:
}
void post() {
- vespalib::MonitorGuard guard(_sync);
+ std::unique_lock guard(_lock);
assert(_count >= 0);
_count++;
if (_numWaiters > 0) {
- guard.signal();
+ _cond.notify_one();
}
}
};
@@ -82,12 +79,12 @@ template <typename T, typename Q=std::queue<T> >
class QueueBase
{
public:
- QueueBase() : _cond(), _count(0), _q() { }
+ QueueBase() : _lock(), _count(0), _q() { }
virtual ~QueueBase() { }
size_t size() const { return internal_size(); }
bool empty() const { return size() == 0; }
protected:
- vespalib::Monitor _cond;
+ std::mutex _lock;
document::Semaphore _count;
Q _q;
@@ -119,7 +116,7 @@ public:
(void)timeout;
bool retval;
{
- vespalib::MonitorGuard guard(this->_cond);
+ std::lock_guard guard(this->_lock);
retval = this->internal_push(msg);
}
this->_count.post();
@@ -131,77 +128,12 @@ public:
this->_count.wait() :
this->_count.wait(timeout));
if ( retval ) {
- vespalib::MonitorGuard guard(this->_cond);
+ std::lock_guard guard(this->_lock);
retval = this->internal_pop(msg);
}
return retval;
}
};
-template <typename T, typename Q=std::queue<T> >
-class QueueWithMax : public QueueBase<T, Q>
-{
-protected:
- size_t _size;
- size_t storesize() const { return _size; }
- virtual void add(const T& UNUSED_PARAM(msg)) { _size++; }
- virtual void sub(const T& UNUSED_PARAM(msg)) { _size--; }
-private:
- size_t _max;
- size_t _lowWaterMark;
- int _writersWaiting;
-public:
- QueueWithMax(size_t max_=1000, size_t lowWaterMark_=500)
- : QueueBase<T, Q>(),
- _size(0),
- _max(max_),
- _lowWaterMark(lowWaterMark_),
- _writersWaiting(0)
- { }
- bool push(const T& msg, int timeout=-1)
- {
- bool retval=true;
- {
- vespalib::MonitorGuard guard(this->_cond);
- if (storesize() >= _max) {
- ++_writersWaiting;
- if (timeout >= 0) {
- retval = guard.wait(timeout);
- } else {
- guard.wait();
- }
- --_writersWaiting;
- }
- if (retval) {
- retval = internal_push(msg);
- }
- if (retval) {
- add(msg);
- }
- }
- if (retval) {
- this->_count.post();
- }
- return retval;
- }
- bool pop(T& msg, int timeout=-1)
- {
- bool retval((timeout == -1) ?
- this->_count.wait() :
- this->_count.wait(timeout));
- if ( retval ) {
- vespalib::MonitorGuard guard(this->_cond);
- retval = internal_pop(msg);
- if (retval) {
- sub(msg);
- if (_writersWaiting > 0 && storesize() < _lowWaterMark) {
- guard.signal();
- }
- }
- }
- return retval;
- }
-};
-
} // namespace document
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 4579f7dec0e..9623fa59acf 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "messenger.h"
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/gate.h>
#include <vespa/log/log.h>
@@ -157,22 +156,23 @@ public:
namespace mbus {
Messenger::Messenger(bool skip_request_thread, bool skip_reply_thread) :
- _monitor(),
- _pool(128000),
- _children(),
- _queue(),
- _closed(false),
- _skip_request_thread(skip_request_thread),
- _skip_reply_thread(skip_reply_thread)
+ _lock(),
+ _pool(128000),
+ _children(),
+ _queue(),
+ _closed(false),
+ _skip_request_thread(skip_request_thread),
+ _skip_reply_thread(skip_reply_thread)
{}
Messenger::~Messenger()
{
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
_closed = true;
- guard.broadcast();
}
+ _cond.notify_all();
+
_pool.Close();
std::for_each(_children.begin(), _children.end(), DeleteFunctor<ITask>());
if ( ! _queue.empty()) {
@@ -194,12 +194,12 @@ Messenger::Run(FastOS_ThreadInterface *thread, void *arg)
while (true) {
ITask::UP task;
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
if (_closed) {
break;
}
if (_queue.empty()) {
- guard.wait(100);
+ _cond.wait_for(guard, 100ms);
}
if (!_queue.empty()) {
task.reset(_queue.front());
@@ -237,7 +237,7 @@ Messenger::discardRecurrentTasks()
bool
Messenger::start()
{
- if (_pool.NewThread(this) == 0) {
+ if (_pool.NewThread(this) == nullptr) {
return false;
}
return true;
@@ -266,11 +266,12 @@ Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler)
void
Messenger::enqueue(ITask::UP task)
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
if (!_closed) {
_queue.push(task.release());
if (_queue.size() == 1) {
- guard.signal();
+ guard.unlock();
+ _cond.notify_one();
}
}
}
@@ -286,7 +287,7 @@ Messenger::sync()
bool
Messenger::isEmpty() const
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
return _queue.empty();
}
diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h
index 3103e9afae1..7ca3749b970 100644
--- a/messagebus/src/vespa/messagebus/messenger.h
+++ b/messagebus/src/vespa/messagebus/messenger.h
@@ -6,7 +6,6 @@
#include "message.h"
#include "reply.h"
#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/fastos/thread.h>
@@ -40,8 +39,9 @@ public:
};
private:
- vespalib::Monitor _monitor;
- FastOS_ThreadPool _pool;
+ mutable std::mutex _lock;
+ std::condition_variable _cond;
+ FastOS_ThreadPool _pool;
std::vector<ITask*> _children;
vespalib::ArrayQueue<ITask*> _queue;
bool _closed;
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index ea21010e21c..b91ba43f036 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -29,11 +29,11 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler)
ResolveState state = _state.load(std::memory_order_acquire);
bool hasVersion = (state == VERSION_RESOLVED);
if ( ! hasVersion ) {
- vespalib::MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
state = _state.load(std::memory_order_relaxed);
if (state == VERSION_RESOLVED || state == PROCESSING_HANDLERS) {
while (_state.load(std::memory_order::memory_order_relaxed) == PROCESSING_HANDLERS) {
- guard.wait();
+ _cond.wait(guard);
}
hasVersion = true;
} else {
@@ -71,7 +71,7 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
{
HandlerList handlers;
{
- vespalib::MonitorGuard guard(_lock);
+ std::lock_guard guard(_lock);
assert(_state == TARGET_INVOKED);
if (req->CheckReturnTypes("s")) {
FRT_Values &val = *req->GetReturn();
@@ -90,10 +90,10 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
handler->handleVersion(_version.get());
}
{
- vespalib::MonitorGuard guard(_lock);
+ std::lock_guard guard(_lock);
_state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED);
- guard.broadcast();
}
+ _cond.notify_all();
req->SubRef();
}
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h
index d927292f26d..6a57bd983e7 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.h
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.h
@@ -5,7 +5,6 @@
#include <vespa/fnet/frt/invoker.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/vespalib/component/version.h>
-#include <vespa/vespalib/util/sync.h>
namespace mbus {
@@ -27,7 +26,7 @@ public:
/**
* Virtual destructor required for inheritance.
*/
- virtual ~IVersionHandler() { }
+ virtual ~IVersionHandler() = default;
/**
* This method is invoked once the version of the corresponding {@link
@@ -50,7 +49,8 @@ private:
};
typedef std::unique_ptr<vespalib::Version> Version_UP;
- vespalib::Monitor _lock;
+ std::mutex _lock;
+ std::condition_variable _cond;
FRT_Supervisor &_orb;
string _name;
FRT_Target &_target;
diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp
index 494ee11c7cc..9ec6d36f688 100644
--- a/messagebus/src/vespa/messagebus/routablequeue.cpp
+++ b/messagebus/src/vespa/messagebus/routablequeue.cpp
@@ -7,7 +7,7 @@ using namespace std::chrono;
namespace mbus {
RoutableQueue::RoutableQueue()
- : _monitor(),
+ : _lock(),
_queue()
{ }
@@ -23,18 +23,19 @@ RoutableQueue::~RoutableQueue()
uint32_t
RoutableQueue::size()
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
return _queue.size();
}
void
RoutableQueue::enqueue(Routable::UP r)
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
_queue.push(r.get());
r.release();
if (_queue.size() == 1) {
- guard.broadcast(); // support multiple readers
+ guard.unlock();
+ _cond.notify_all(); // support multiple readers
}
}
@@ -43,9 +44,9 @@ RoutableQueue::dequeue(duration timeout)
{
steady_clock::time_point startTime = steady_clock::now();
duration left = timeout;
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
while (_queue.size() == 0 && left > duration::zero()) {
- if (!guard.wait(left) || _queue.size() > 0) {
+ if ((_cond.wait_for(guard, left) == std::cv_status::no_timeout) || (_queue.size() > 0)) {
break;
}
duration elapsed = (steady_clock::now() - startTime);
diff --git a/messagebus/src/vespa/messagebus/routablequeue.h b/messagebus/src/vespa/messagebus/routablequeue.h
index 03f81a7a8d3..0c3edc1a597 100644
--- a/messagebus/src/vespa/messagebus/routablequeue.h
+++ b/messagebus/src/vespa/messagebus/routablequeue.h
@@ -2,13 +2,14 @@
#pragma once
-#include <vespa/vespalib/util/sync.h>
#include "imessagehandler.h"
#include "ireplyhandler.h"
#include "queue.h"
#include "routable.h"
#include "message.h"
#include "reply.h"
+#include <mutex>
+#include <condition_variable>
namespace mbus {
@@ -25,8 +26,9 @@ class RoutableQueue : public IMessageHandler,
public IReplyHandler
{
private:
- vespalib::Monitor _monitor;
- Queue<Routable*> _queue;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ Queue<Routable*> _queue;
public:
RoutableQueue(const RoutableQueue &) = delete;
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index eece44a922a..b22f684b848 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -13,7 +13,7 @@ using vespalib::make_string;
namespace mbus {
SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams &params)
- : _monitor(),
+ : _lock(),
_mbus(mbus),
_gate(new ReplyGate(_mbus)),
_sequencer(*_gate),
@@ -76,17 +76,17 @@ SourceSession::send(Message::UP msg)
msg->setTimeRemaining(_timeout);
}
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
if (_closed) {
return Result(Error(ErrorCode::SEND_QUEUE_CLOSED, "Source session is closed."), std::move(msg));
}
- if (_throttlePolicy.get() != nullptr && !_throttlePolicy->canSend(*msg, _pendingCount)) {
+ if (_throttlePolicy && !_throttlePolicy->canSend(*msg, _pendingCount)) {
return Result(Error(ErrorCode::SEND_QUEUE_FULL,
make_string("Too much pending data (%d messages).", _pendingCount)),
std::move(msg));
}
msg->pushHandler(_replyHandler);
- if (_throttlePolicy.get() != nullptr) {
+ if (_throttlePolicy) {
_throttlePolicy->processMessage(*msg);
}
++_pendingCount;
@@ -106,10 +106,10 @@ SourceSession::handleReply(Reply::UP reply)
{
bool done;
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
assert(_pendingCount > 0);
--_pendingCount;
- if (_throttlePolicy.get() != nullptr) {
+ if (_throttlePolicy) {
_throttlePolicy->processReply(*reply);
}
done = (_closed && _pendingCount == 0);
@@ -121,31 +121,33 @@ SourceSession::handleReply(Reply::UP reply)
IReplyHandler &handler = reply->getCallStack().pop(*reply);
handler.handleReply(std::move(reply));
if (done) {
- vespalib::MonitorGuard guard(_monitor);
- assert(_pendingCount == 0);
- assert(_closed);
- _done = true;
- guard.broadcast();
+ {
+ std::lock_guard guard(_lock);
+ assert(_pendingCount == 0);
+ assert(_closed);
+ _done = true;
+ }
+ _cond.notify_all();
}
}
void
SourceSession::close()
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
_closed = true;
if (_pendingCount == 0) {
_done = true;
}
while (!_done) {
- guard.wait();
+ _cond.wait(guard);
}
}
SourceSession &
SourceSession::setTimeout(duration timeout)
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
_timeout = timeout;
return *this;
}
diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h
index 0992a3e377b..c7dfcdf9337 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.h
+++ b/messagebus/src/vespa/messagebus/sourcesession.h
@@ -1,11 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/vespalib/util/sync.h>
#include "ireplyhandler.h"
#include "result.h"
#include "sequencer.h"
#include "sourcesessionparams.h"
+#include <condition_variable>
namespace mbus {
@@ -21,7 +21,8 @@ class SourceSession : public IReplyHandler {
private:
friend class MessageBus;
- vespalib::Monitor _monitor;
+ std::mutex _lock;
+ std::condition_variable _cond;
MessageBus &_mbus;
ReplyGate *_gate;
Sequencer _sequencer;
diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
index 01d644bba09..a8199938a25 100644
--- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
@@ -12,27 +12,27 @@ Receptor::~Receptor() = default;
void
Receptor::handleMessage(Message::UP msg)
{
- vespalib::MonitorGuard guard(_mon);
+ std::lock_guard guard(_mon);
_msg = std::move(msg);
- guard.broadcast();
+ _cond.notify_all();
}
void
Receptor::handleReply(Reply::UP reply)
{
- vespalib::MonitorGuard guard(_mon);
+ std::lock_guard guard(_mon);
_reply = std::move(reply);
- guard.broadcast();
+ _cond.notify_all();
}
Message::UP
Receptor::getMessage(duration maxWait)
{
steady_clock::time_point startTime = steady_clock::now();
- vespalib::MonitorGuard guard(_mon);
- while (_msg.get() == 0) {
+ std::unique_lock guard(_mon);
+ while ( ! _msg) {
duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime);
- if (w <= duration::zero() || !guard.wait(w)) {
+ if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) {
break;
}
}
@@ -43,10 +43,10 @@ Reply::UP
Receptor::getReply(duration maxWait)
{
steady_clock::time_point startTime = steady_clock::now();
- vespalib::MonitorGuard guard(_mon);
+ std::unique_lock guard(_mon);
while (_reply.get() == 0) {
duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime);
- if (w <= duration::zero() || !guard.wait(w)) {
+ if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) {
break;
}
}
diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h
index 1d98ac62cd2..4e734319ca0 100644
--- a/messagebus/src/vespa/messagebus/testlib/receptor.h
+++ b/messagebus/src/vespa/messagebus/testlib/receptor.h
@@ -6,7 +6,7 @@
#include <vespa/messagebus/ireplyhandler.h>
#include <vespa/messagebus/message.h>
#include <vespa/messagebus/reply.h>
-#include <vespa/vespalib/util/sync.h>
+#include <condition_variable>
namespace mbus {
@@ -14,15 +14,13 @@ class Receptor : public IMessageHandler,
public IReplyHandler
{
private:
- vespalib::Monitor _mon;
- Message::UP _msg;
- Reply::UP _reply;
-
- Receptor(const Receptor &);
- Receptor &operator=(const Receptor &);
+ std::mutex _mon;
+ std::condition_variable _cond;
+ Message::UP _msg;
+ Reply::UP _reply;
public:
Receptor();
- ~Receptor();
+ ~Receptor() override;
void handleMessage(Message::UP msg) override;
void handleReply(Reply::UP reply) override;
Message::UP getMessage(duration maxWait = 120s);
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 3285e03db67..54464105bb3 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -329,7 +329,7 @@ DummyPersistence::getPartitionStates() const
{
_initialized = true;
LOG(debug, "getPartitionStates()");
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
return PartitionStateListResult(_partitions);
}
@@ -344,7 +344,7 @@ DummyPersistence::listBuckets(BucketSpace bucketSpace, PartitionId id) const
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "listBuckets(%u)", uint16_t(id));
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
BucketIdListResult::List list;
if (bucketSpace == FixedBucketSpaces::default_space()) {
for (PartitionContent::const_iterator it = _content[id].begin();
@@ -359,14 +359,14 @@ DummyPersistence::listBuckets(BucketSpace bucketSpace, PartitionId id) const
void
DummyPersistence::setModifiedBuckets(const BucketIdListResult::List& buckets)
{
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
_modifiedBuckets = buckets;
}
BucketIdListResult
DummyPersistence::getModifiedBuckets(BucketSpace bucketSpace) const
{
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
if (bucketSpace == FixedBucketSpaces::default_space()) {
return BucketIdListResult(_modifiedBuckets);
} else {
@@ -378,7 +378,7 @@ DummyPersistence::getModifiedBuckets(BucketSpace bucketSpace) const
Result
DummyPersistence::setClusterState(BucketSpace bucketSpace, const ClusterState& c)
{
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
if (bucketSpace == FixedBucketSpaces::default_space()) {
_clusterState.reset(new ClusterState(c));
if (!_clusterState->nodeUp()) {
@@ -570,7 +570,7 @@ DummyPersistence::createIterator(const Bucket &b, FieldSetSP fs, const Selection
Iterator* it;
IteratorId id;
{
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
id = _nextIterator;
++_nextIterator;
assert(_iterators.find(id) == _iterators.end());
@@ -640,7 +640,7 @@ DummyPersistence::iterate(IteratorId id, uint64_t maxByteSize, Context& ctx) con
ctx.trace(9, "started iterate()");
Iterator* it;
{
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
std::map<IteratorId, Iterator::UP>::iterator iter(_iterators.find(id));
if (iter == _iterators.end()) {
return IterateResult(Result::ErrorType::PERMANENT_ERROR,
@@ -711,7 +711,7 @@ DummyPersistence::destroyIterator(IteratorId id, Context&)
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "destroyIterator(%" PRIu64 ")", uint64_t(id));
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
if (_iterators.find(id) != _iterators.end()) {
_iterators.erase(id);
}
@@ -724,7 +724,7 @@ DummyPersistence::createBucket(const Bucket& b, Context&)
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "createBucket(%s)", b.toString().c_str());
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
if (_content[b.getPartition()].find(b) == _content[b.getPartition()].end()) {
_content[b.getPartition()][b] = std::make_shared<BucketContent>();
} else {
@@ -740,7 +740,7 @@ DummyPersistence::deleteBucket(const Bucket& b, Context&)
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "deleteBucket(%s)", b.toString().c_str());
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
if (_content[b.getPartition()][b].get()) {
assert(!_content[b.getPartition()][b]->_inUse);
}
@@ -904,7 +904,7 @@ DummyPersistence::dumpBucket(const Bucket& b) const
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(spam, "dumpBucket(%s)", b.toString().c_str());
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
PartitionContent::const_iterator it(_content[b.getPartition()].find(b));
if (it == _content[b.getPartition()].end()) {
return "DOESN'T EXIST";
@@ -924,7 +924,7 @@ DummyPersistence::isActive(const Bucket& b) const
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
LOG(spam, "isActive(%s)", b.toString().c_str());
PartitionContent::const_iterator it(_content[b.getPartition()].find(b));
if (it == _content[b.getPartition()].end()) {
@@ -942,7 +942,7 @@ BucketContentGuard::UP
DummyPersistence::acquireBucketWithLock(const Bucket& b, LockMode lock_mode) const
{
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
- vespalib::MonitorGuard lock(_monitor);
+ std::lock_guard lock(_monitor);
DummyPersistence& ncp(const_cast<DummyPersistence&>(*this));
PartitionContent::iterator it(ncp._content[b.getPartition()].find(b));
if (it == ncp._content[b.getPartition()].end()) {
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index c3a4991a590..5f9d2b6ddc3 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -11,10 +11,11 @@
#include <vespa/persistence/spi/abstractpersistenceprovider.h>
#include <vespa/document/base/globalid.h>
#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <atomic>
#include <map>
+#include <mutex>
+#include <condition_variable>
namespace document {
class DocumentTypeRepo;
@@ -207,7 +208,8 @@ private:
std::vector<PartitionContent> _content;
IteratorId _nextIterator;
mutable std::map<IteratorId, Iterator::UP> _iterators;
- vespalib::Monitor _monitor;
+ mutable std::mutex _monitor;
+ std::condition_variable _cond;
std::unique_ptr<ClusterState> _clusterState;
diff --git a/storage/src/tests/common/dummystoragelink.cpp b/storage/src/tests/common/dummystoragelink.cpp
index ab70bff3409..8d188002c67 100644
--- a/storage/src/tests/common/dummystoragelink.cpp
+++ b/storage/src/tests/common/dummystoragelink.cpp
@@ -63,12 +63,12 @@ bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd)
}
}
if (isBottom()) {
- vespalib::MonitorGuard lock(_waitMonitor);
+ std::lock_guard lock(_waitMonitor);
{
std::lock_guard guard(_lock);
_commands.push_back(cmd);
}
- lock.broadcast();
+ _waitCond.notify_all();
return true;
}
return StorageLink::onDown(cmd);
@@ -76,12 +76,12 @@ bool DummyStorageLink::onDown(const api::StorageMessage::SP& cmd)
bool DummyStorageLink::onUp(const api::StorageMessage::SP& reply) {
if (isTop()) {
- vespalib::MonitorGuard lock(_waitMonitor);
+ std::lock_guard lock(_waitMonitor);
{
std::lock_guard guard(_lock);
_replies.push_back(reply);
}
- lock.broadcast();
+ _waitCond.notify_all();
return true;
}
return StorageLink::onUp(reply);
@@ -96,7 +96,7 @@ void DummyStorageLink::injectReply(api::StorageReply* reply)
}
void DummyStorageLink::reset() {
- vespalib::MonitorGuard lock(_waitMonitor);
+ std::lock_guard lock(_waitMonitor);
std::lock_guard guard(_lock);
_commands.clear();
_replies.clear();
@@ -106,11 +106,10 @@ void DummyStorageLink::reset() {
void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout)
{
framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
- vespalib::MonitorGuard lock(_waitMonitor);
+ vespalib::steady_time endTime = clock.getMonotonicTime() + vespalib::from_s(timeout);
+ std::unique_lock guard(_waitMonitor);
while (_commands.size() + _replies.size() < msgCount) {
- if (timeout != 0 && clock.getTimeInMillis() > endTime) {
+ if (timeout != 0 && clock.getMonotonicTime() > endTime) {
std::ostringstream ost;
ost << "Timed out waiting for " << msgCount << " messages to "
<< "arrive in dummy storage link. Only "
@@ -119,9 +118,9 @@ void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout)
throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
}
if (timeout >= 0) {
- lock.wait((endTime - clock.getTimeInMillis()).getTime());
+ _waitCond.wait_until(guard, endTime);
} else {
- lock.wait();
+ _waitCond.wait(guard);
}
}
}
@@ -129,9 +128,8 @@ void DummyStorageLink::waitForMessages(unsigned int msgCount, int timeout)
void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout)
{
framework::defaultimplementation::RealClock clock;
- framework::MilliSecTime endTime(
- clock.getTimeInMillis() + framework::MilliSecTime(timeout * 1000));
- vespalib::MonitorGuard lock(_waitMonitor);
+ vespalib::steady_time endTime = clock.getMonotonicTime() + vespalib::from_s(timeout);
+ std::unique_lock lock(_waitMonitor);
while (true) {
for (uint32_t i=0; i<_commands.size(); ++i) {
if (_commands[i]->getType() == type) return;
@@ -139,7 +137,7 @@ void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout)
for (uint32_t i=0; i<_replies.size(); ++i) {
if (_replies[i]->getType() == type) return;
}
- if (timeout != 0 && clock.getTimeInMillis() > endTime) {
+ if (timeout != 0 && clock.getMonotonicTime() > endTime) {
std::ostringstream ost;
ost << "Timed out waiting for " << type << " message to "
<< "arrive in dummy storage link. Only "
@@ -154,9 +152,9 @@ void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout)
throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
}
if (timeout >= 0) {
- lock.wait((endTime - clock.getTimeInMillis()).getTime());
+ _waitCond.wait_until(lock, endTime);
} else {
- lock.wait();
+ _waitCond.wait(lock);
}
}
}
@@ -164,7 +162,7 @@ void DummyStorageLink::waitForMessage(const api::MessageType& type, int timeout)
api::StorageMessage::SP
DummyStorageLink::getAndRemoveMessage(const api::MessageType& type)
{
- vespalib::MonitorGuard lock(_waitMonitor);
+ std::lock_guard lock(_waitMonitor);
for (std::vector<api::StorageMessage::SP>::iterator it = _commands.begin();
it != _commands.end(); ++it)
{
diff --git a/storage/src/tests/common/dummystoragelink.h b/storage/src/tests/common/dummystoragelink.h
index 46a1d4ea25f..5a0f1ad96b9 100644
--- a/storage/src/tests/common/dummystoragelink.h
+++ b/storage/src/tests/common/dummystoragelink.h
@@ -2,7 +2,6 @@
#pragma once
-#include <vespa/vespalib/util/sync.h>
#include <list>
#include <sstream>
#include <vespa/storageapi/messageapi/storagecommand.h>
@@ -27,7 +26,8 @@ class DummyStorageLink : public StorageLink {
bool _useDispatch;
bool _ignore;
static DummyStorageLink* _last;
- vespalib::Monitor _waitMonitor;
+ std::mutex _waitMonitor;
+ std::condition_variable _waitCond;
public:
DummyStorageLink();
@@ -87,7 +87,7 @@ public:
{ return _replies; }
std::vector<api::StorageMessage::SP> getCommandsOnce() {
- vespalib::MonitorGuard lock(_waitMonitor);
+ std::lock_guard lock(_waitMonitor);
std::vector<api::StorageMessage::SP> retval;
{
std::lock_guard guard(_lock);
@@ -97,7 +97,7 @@ public:
}
std::vector<api::StorageMessage::SP> getRepliesOnce() {
- vespalib::MonitorGuard lock(_waitMonitor);
+ std::lock_guard lock(_waitMonitor);
std::vector<api::StorageMessage::SP> retval;
{
std::lock_guard guard(_lock);
diff --git a/storageserver/src/apps/storaged/storage.cpp b/storageserver/src/apps/storaged/storage.cpp
index 903c61875ed..428067e3059 100644
--- a/storageserver/src/apps/storaged/storage.cpp
+++ b/storageserver/src/apps/storaged/storage.cpp
@@ -51,12 +51,13 @@ Process::UP createProcess(vespalib::stringref configId) {
class StorageApp : public FastOS_Application,
private vespalib::ProgramOptions
{
- std::string _configId;
- bool _showSyntax;
- uint32_t _maxShutdownTime;
- int _lastSignal;
- vespalib::Monitor _signalLock;
- Process::UP _process;
+ std::string _configId;
+ bool _showSyntax;
+ uint32_t _maxShutdownTime;
+ int _lastSignal;
+ std::mutex _signalLock;
+ std::condition_variable _signalCond;
+ Process::UP _process;
public:
StorageApp();
@@ -64,11 +65,10 @@ public:
void handleSignal(int signal) {
LOG(info, "Got signal %d, waiting for lock", signal);
- vespalib::MonitorGuard sync(_signalLock);
-
+ std::lock_guard sync(_signalLock);
LOG(info, "Got lock for signal %d", signal);
_lastSignal = signal;
- sync.signal();
+ _signalCond.notify_one();
}
void handleSignals();
@@ -103,8 +103,7 @@ StorageApp::~StorageApp() = default;
bool StorageApp::Init()
{
FastOS_Application::Init();
- setCommandLineArguments(
- FastOS_Application::_argc, FastOS_Application::_argv);
+ setCommandLineArguments(FastOS_Application::_argc, FastOS_Application::_argv);
try{
parse();
} catch (vespalib::InvalidCommandLineArgumentsException& e) {
@@ -192,9 +191,9 @@ int StorageApp::Main()
ResumeGuard guard(_process->getNode().pause());
_process->updateConfig();
}
- // Wait until we get a kill signal.
- vespalib::MonitorGuard lock(_signalLock);
- lock.wait(1000ms);
+ // Wait until we get a kill signal.
+ std::unique_lock guard(_signalLock);
+ _signalCond.wait_for(guard, 1000ms);
handleSignals();
}
LOG(debug, "Server was attempted stopped, shutting down");
diff --git a/vdslib/CMakeLists.txt b/vdslib/CMakeLists.txt
index 3c1ee756e56..b66f53a4e19 100644
--- a/vdslib/CMakeLists.txt
+++ b/vdslib/CMakeLists.txt
@@ -14,7 +14,6 @@ vespa_define_module(
src/vespa/vdslib/container
src/vespa/vdslib/distribution
src/vespa/vdslib/state
- src/vespa/vdslib/thread
TEST_DEPENDS
vdstestlib
@@ -24,5 +23,4 @@ vespa_define_module(
src/tests/container
src/tests/distribution
src/tests/state
- src/tests/thread
)
diff --git a/vdslib/src/tests/CMakeLists.txt b/vdslib/src/tests/CMakeLists.txt
index 6cf1ba5e33f..0b48d675ddc 100644
--- a/vdslib/src/tests/CMakeLists.txt
+++ b/vdslib/src/tests/CMakeLists.txt
@@ -9,7 +9,6 @@ vespa_add_executable(vdslib_gtest_runner_app TEST
vdslib_containertest
vdslib_testdistribution
vdslib_teststate
- vdslib_testthread
GTest::GTest
)
diff --git a/vdslib/src/tests/thread/.gitignore b/vdslib/src/tests/thread/.gitignore
deleted file mode 100644
index 583460ae288..00000000000
--- a/vdslib/src/tests/thread/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-*.So
-.depend
-Makefile
diff --git a/vdslib/src/tests/thread/CMakeLists.txt b/vdslib/src/tests/thread/CMakeLists.txt
deleted file mode 100644
index bf2c8a41c9b..00000000000
--- a/vdslib/src/tests/thread/CMakeLists.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(vdslib_testthread
- SOURCES
- taskschedulertest.cpp
- DEPENDS
- vdslib
- GTest::GTest
-)
diff --git a/vdslib/src/tests/thread/taskschedulertest.cpp b/vdslib/src/tests/thread/taskschedulertest.cpp
deleted file mode 100644
index 54877fae62b..00000000000
--- a/vdslib/src/tests/thread/taskschedulertest.cpp
+++ /dev/null
@@ -1,227 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vdslib/thread/taskscheduler.h>
-#include <vespa/vespalib/gtest/gtest.h>
-#include <thread>
-
-namespace vdslib {
-
-namespace {
-
-struct TestWatch : public TaskScheduler::Watch {
- mutable std::mutex _lock;
- uint64_t _time;
-
- TestWatch(uint64_t startTime = 0) : _time(startTime) {}
- ~TestWatch() = default;
-
- TaskScheduler::Time getTime() const override {
- std::lock_guard guard(_lock);
- return _time;
- }
-
- void increment(uint64_t ms) {
- std::lock_guard guard(_lock);
- _time += ms;
- }
-
- void set(uint64_t ms) {
- std::lock_guard guard(_lock);
- _time = ms;
- }
-};
-
-struct TestTask : public TaskScheduler::Task
-{
- TestWatch& _watch;
- uint64_t _executionTime;
- uint64_t _maxRuns;
- uint64_t _maxTime;
- int64_t _result;
- uint64_t _currentRuns;
- std::string _name;
- std::vector<std::string>* _register;
-
- TestTask(TestWatch& watch, uint64_t executionTime, uint64_t maxRuns,
- uint64_t maxTime, int64_t result)
- : _watch(watch), _executionTime(executionTime), _maxRuns(maxRuns),
- _maxTime(maxTime), _result(result), _currentRuns(0),
- _name(), _register(0)
- {
- }
-
- void registerCallsWithName(const std::string& name,
- std::vector<std::string>& myregister)
- {
- _name = name;
- _register = &myregister;
- }
-
- int64_t run(TaskScheduler::Time currentTime) override {
- // Emulate that we use time to run
- _watch.increment(_executionTime);
- if (_register != 0) {
- std::ostringstream ost;
- ost << currentTime;
- if (_name.size() > 0) {
- ost << " " << _name;
- }
- _register->push_back(ost.str());
- }
- // If max runs, dont run anymore
- if (++_currentRuns >= _maxRuns) {
- //std::cerr << "Max runs run, returning 0\n";
- return 0;
- }
- // If we will go beyond max time, dont run anymore
- if (_result > 0 && currentTime + _result > _maxTime) {
- //std::cerr << "Max time spent, returning 0\n";
- return 0;
- }
- //std::cerr << "Executed test task. Returning " << _result << "\n";
- return _result;
- }
-
-};
-
-std::string join(std::vector<std::string>& v) {
- std::ostringstream ost;
- for (size_t i=0; i<v.size(); ++i) {
- if (i != 0) ost << ",";
- ost << v[i];
- }
- return ost.str();
-}
-
-}
-
-TEST(TaskSchedulerTest, test_simple)
-{
- FastOS_ThreadPool threadPool(128 * 1024);
- TestWatch watch(0);
- TaskScheduler scheduler;
- scheduler.setWatch(watch);
- scheduler.start(threadPool);
- std::vector<std::string> calls;
-
- // Test that one can schedule a single task immediately
- {
- calls.clear();
- watch.set(0);
- uint64_t counter = scheduler.getTaskCounter();
- TestTask* task(new TestTask(watch, 10, 5, 1000, 0));
- task->registerCallsWithName("", calls);
- scheduler.add(TestTask::UP(task));
- scheduler.waitForTaskCounterOfAtLeast(counter + 1);
- EXPECT_EQ(std::string("0"), join(calls));
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- }
- // Test that task is repeated at intervals if wanted.
- {
- calls.clear();
- watch.set(0);
- uint64_t counter = scheduler.getTaskCounter();
- TestTask* task(new TestTask(watch, 10, 5, 1000, -20));
- task->registerCallsWithName("", calls);
- scheduler.add(TestTask::UP(task));
- for (uint32_t i = 1; i <= 5; ++i) {
- scheduler.waitForTaskCounterOfAtLeast(counter + i);
- watch.increment(100);
- }
- EXPECT_EQ(std::string("0,110,220,330,440"),
- join(calls));
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- }
- // Test that task scheduled at specific time works, and that if
- // scheduled at specific time in the past/current, we're rerun at once.
- {
- calls.clear();
- watch.set(0);
- uint64_t counter = scheduler.getTaskCounter();
- TestTask* task(new TestTask(watch, 10, 4, 1000, 100));
- task->registerCallsWithName("", calls);
- scheduler.addAbsolute(TestTask::UP(task), 50);
- watch.increment(49); // Not yet time to run
- std::this_thread::sleep_for(5ms);
- // Check that it has not run yet..
- EXPECT_EQ(counter, scheduler.getTaskCounter());
- watch.increment(10); // Now time is enough for it to run
- scheduler.waitForTaskCounterOfAtLeast(counter + 1);
- watch.increment(10);
- std::this_thread::sleep_for(5ms);
- // Check that it has not run yet..
- EXPECT_EQ(counter + 1, scheduler.getTaskCounter());
- watch.increment(50);
- scheduler.waitForTaskCounterOfAtLeast(counter + 2);
- EXPECT_EQ(std::string("59,129,129,129"),
- join(calls));
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- }
-}
-
-TEST(TaskSchedulerTest, test_multiple_tasks_at_same_time)
-{
- FastOS_ThreadPool threadPool(128 * 1024);
- TestWatch watch(0);
- TaskScheduler scheduler;
- scheduler.setWatch(watch);
- std::vector<std::string> calls;
-
- // Test that tasks deleted before they are run are automatically
- // cancelled and removed from scheduler
- {
- TestTask* task1(new TestTask(watch, 10, 3, 1000, 10));
- TestTask* task2(new TestTask(watch, 10, 3, 1000, 10));
- task1->registerCallsWithName("task1", calls);
- task2->registerCallsWithName("task2", calls);
- watch.set(10);
- scheduler.add(TestTask::UP(task1));
- scheduler.add(TestTask::UP(task2));
- // Start threadpool after adding both, such that we ensure both
- // are added at the same time interval
- scheduler.start(threadPool);
-
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- std::ostringstream ost;
- for (size_t i=0; i<calls.size(); ++i) ost << calls[i] << "\n";
-
- EXPECT_EQ(std::string(
- "10 task1\n"
- "10 task2\n"
- "10 task1\n"
- "10 task2\n"
- "10 task1\n"
- "10 task2\n"
- ), ost.str());
- }
-}
-
-TEST(TaskSchedulerTest, test_remove_task)
-{
- FastOS_ThreadPool threadPool(128 * 1024);
- TestWatch watch(0);
- TaskScheduler scheduler;
- scheduler.setWatch(watch);
- scheduler.start(threadPool);
- std::vector<std::string> calls;
-
- // Schedule a task, and remove it..
- {
- calls.clear();
- watch.set(0);
- TestTask* task(new TestTask(watch, 10, 5, 1000, 0));
- task->registerCallsWithName("", calls);
- scheduler.addAbsolute(TestTask::UP(task), 50);
- // Remove actual task
- scheduler.remove(task);
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- // Remove non-existing task
- task = new TestTask(watch, 10, 5, 1000, 0);
- scheduler.remove(task);
- delete task;
- // Time should not be advanced as task didn't get to run
- EXPECT_EQ(0, (int) watch.getTime());
- }
-}
-
-}
diff --git a/vdslib/src/vespa/vdslib/CMakeLists.txt b/vdslib/src/vespa/vdslib/CMakeLists.txt
index cf5053a5ceb..ea19664a45f 100644
--- a/vdslib/src/vespa/vdslib/CMakeLists.txt
+++ b/vdslib/src/vespa/vdslib/CMakeLists.txt
@@ -4,7 +4,6 @@ vespa_add_library(vdslib
$<TARGET_OBJECTS:vdslib_container>
$<TARGET_OBJECTS:vdslib_state>
$<TARGET_OBJECTS:vdslib_distribution>
- $<TARGET_OBJECTS:vdslib_thread>
INSTALL lib64
DEPENDS
)
diff --git a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt b/vdslib/src/vespa/vdslib/thread/CMakeLists.txt
deleted file mode 100644
index 656772afc49..00000000000
--- a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(vdslib_thread OBJECT
- SOURCES
- taskscheduler.cpp
- DEPENDS
-)
diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp b/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp
deleted file mode 100644
index 08c7b80e406..00000000000
--- a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "taskscheduler.h"
-#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <sys/time.h>
-
-namespace vdslib {
-
-uint64_t
-TaskScheduler::Watch::getTime() const
-{
- struct timeval mytime;
- gettimeofday(&mytime, 0);
- return mytime.tv_sec * 1000llu + mytime.tv_usec / 1000;
-}
-
-TaskScheduler::TaskScheduler()
- : _lock(),
- _defaultWatch(),
- _watch(&_defaultWatch),
- _tasks(),
- _currentRunningTasks(),
- _taskCounter(0)
-{
-}
-
-bool TaskScheduler::onStop()
-{
- vespalib::MonitorGuard guard(_lock);
- guard.broadcast();
- return true;
-}
-
-TaskScheduler::~TaskScheduler()
-{
- stop();
- {
- vespalib::MonitorGuard guard(_lock);
- guard.broadcast();
- }
- join();
- for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end(); ++it) {
- TaskVector & v(it->second);
- for (TaskVector::iterator it2 = v.begin(); it2 != v.end(); ++it2) {
- delete *it2;
- }
- }
-}
-
-void
-TaskScheduler::add(Task::UP task)
-{
- vespalib::MonitorGuard guard(_lock);
- std::vector<Task*>& tasks(_tasks[_watch->getTime()]);
- tasks.push_back(task.release());
- guard.broadcast();
-}
-
-void
-TaskScheduler::addRelative(Task::UP task, Time timeDiff)
-{
- vespalib::MonitorGuard guard(_lock);
- std::vector<Task*>& tasks(_tasks[_watch->getTime() + timeDiff]);
- tasks.push_back(task.release());
- guard.broadcast();
-}
-
-void
-TaskScheduler::addAbsolute(Task::UP task, Time time)
-{
- vespalib::MonitorGuard guard(_lock);
- std::vector<Task*>& tasks(_tasks[time]);
- tasks.push_back(task.release());
- guard.broadcast();
-}
-
-namespace {
- template<typename T>
- bool contains(const std::vector<T>& source, const T& element) {
- for (size_t i = 0, n = source.size(); i<n; ++i) {
- if (source[i] == element) return true;
- }
- return false;
- }
-
- template<typename T>
- void erase(std::vector<T>& source, const T& element) {
- std::vector<T> result;
- result.reserve(source.size());
- for (size_t i = 0, n = source.size(); i<n; ++i) {
- if (source[i] != element) result.push_back(source[i]);
- }
- result.swap(source);
- }
-}
-
-void
-TaskScheduler::remove(Task* task)
-{
- vespalib::MonitorGuard guard(_lock);
- while (contains(_currentRunningTasks, task)) {
- guard.wait();
- }
- for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end();) {
- if (contains(it->second, task)) {
- erase(it->second, task);
- if (it->second.size() == 0) _tasks.erase(it);
- delete task;
- break;
- }
- ++it;
- }
-}
-
-void
-TaskScheduler::setWatch(const Watch& watch)
-{
- vespalib::MonitorGuard guard(_lock);
- _watch = &watch;
-}
-
-TaskScheduler::Time
-TaskScheduler::getTime() const
-{
- vespalib::MonitorGuard guard(_lock);
- return _watch->getTime();
-}
-
-uint64_t
-TaskScheduler::getTaskCounter() const
-{
- vespalib::MonitorGuard guard(_lock);
- return _taskCounter;
-}
-
-void
-TaskScheduler::waitForTaskCounterOfAtLeast(uint64_t taskCounter,
- uint64_t timeout) const
-{
- vespalib::MonitorGuard guard(_lock);
- uint64_t currentTime = _defaultWatch.getTime();
- uint64_t endTime = currentTime + timeout;
- while (_taskCounter < taskCounter) {
- if (endTime <= currentTime) {
- vespalib::asciistream ost;
- ost << "Task scheduler not reached task counter of " << taskCounter
- << " within timeout of " << timeout << " ms. Current task"
- << " counter is " << _taskCounter;
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
- }
- guard.wait(endTime - currentTime);
- currentTime = _defaultWatch.getTime();
- }
-}
-
-void
-TaskScheduler::waitUntilNoTasksRemaining(uint64_t timeout) const
-{
- vespalib::MonitorGuard guard(_lock);
- uint64_t currentTime = _defaultWatch.getTime();
- uint64_t endTime = currentTime + timeout;
- while (_tasks.size() > 0 || _currentRunningTasks.size() > 0) {
- if (endTime <= currentTime) {
- vespalib::asciistream ost;
- ost << "Task scheduler still have tasks scheduled after timeout"
- << " of " << timeout << " ms. There are " << _tasks.size()
- << " entries in tasks map and " << _currentRunningTasks.size()
- << " tasks currently scheduled to run.";
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
- }
- guard.wait(endTime - currentTime);
- currentTime = _defaultWatch.getTime();
- }
-}
-
-void
-TaskScheduler::run()
-{
- while (1) {
- vespalib::MonitorGuard guard(_lock);
- if (!running()) return;
- Time time = _watch->getTime();
- TaskMap::iterator next = _tasks.begin();
- if (next == _tasks.end()) {
- guard.wait();
- continue;
- }
- if (next->first > time) {
- guard.wait(next->first - time);
- continue;
- }
- TaskVector taskList(next->second);
- _currentRunningTasks.swap(next->second);
- _tasks.erase(next);
- guard.unlock();
- for (size_t i=0; i<taskList.size(); ++i) {
- int64_t result = taskList[i]->run(time);
- if (result < 0) {
- addAbsolute(Task::UP(taskList[i]),
- time + (-1 * result));
- } else if (result > 0) {
- if (static_cast<Time>(result) <= time) {
- taskList.push_back(taskList[i]);
- } else {
- addAbsolute(Task::UP(taskList[i]), result);
- }
- } else {
- delete taskList[i];
- }
- }
- vespalib::MonitorGuard guard2(_lock);
- if (!running()) return;
- _taskCounter += _currentRunningTasks.size();
- _currentRunningTasks.clear();
- guard2.broadcast();
- }
-}
-
-} // vdslib
diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.h b/vdslib/src/vespa/vdslib/thread/taskscheduler.h
deleted file mode 100644
index b34d633e624..00000000000
--- a/vdslib/src/vespa/vdslib/thread/taskscheduler.h
+++ /dev/null
@@ -1,105 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \class vespalib::TaskScheduler
- * \ingroup util
- *
- * \brief Class to register tasks in to get them run in a separate thread.
- *
- * Imported to vdslib as C++ document API needs an independent thread to run
- * events in, as it was subject to errors to use FNET event thread. Converted
- * this class used in storage API client code before.
- */
-#pragma once
-
-#include <vespa/vespalib/util/document_runnable.h>
-#include <map>
-#include <memory>
-#include <vector>
-#include <vespa/vespalib/util/sync.h>
-
-namespace vdslib {
-
-class TaskScheduler : public document::Runnable
-{
-public:
- typedef uint64_t Time;
-
- struct Task {
- typedef std::unique_ptr<Task> UP;
- virtual ~Task() {}
- /**
- * Return 0 to unregister this task. Return a negative number to get a
- * new callback in that many (times -1) milliseconds. Return a positive
- * number to get a callback as soon as thread is available after that
- * absolute point in time (in milliseconds). If returning current time
- * or before, this task will be scheduled to be rerun immediately
- * (after other already waiting tasks have had a chance to run).
- * The current time for the scheduler is given to the task.
- */
- virtual int64_t run(Time) = 0;
- };
-
- /**
- * If you want to fake time (useful for testing), implement your own watch
- * for the scheduler to use.
- */
- struct Watch {
- virtual ~Watch() {}
- virtual Time getTime() const; // In ms since 1970
- };
-
- /** Creates a task scheduler. Remember to call start() to get it going. */
- TaskScheduler();
- ~TaskScheduler();
-
- /** Register a task for immediate execution */
- void add(Task::UP);
-
- /** Register a task to be run in a given number of milliseconds from now */
- void addRelative(Task::UP, Time);
-
- /** Register a task to be run at given absolute time in milliseconds */
- void addAbsolute(Task::UP, Time);
-
- /**
- * Removes a scheduled task from the scheduler. Note that this is
- * currently not efficiently implemented but an exhaustive iteration of
- * current tasks. Assuming number of tasks is small so this doesn't matter.
- * If current task is running while this is called, function will block
- * until it has completed before removing it. (To be safe if you want to
- * delete task after.)
- */
- void remove(Task*);
-
- /** Get the schedulers current time. */
- Time getTime() const;
-
- /** Set a custom watch to be used for this scheduler (Useful for testing) */
- void setWatch(const Watch& watch);
-
- /** Returns a number of current task */
- uint64_t getTaskCounter() const;
-
- /** Wait until a given number of tasks have been completed */
- void waitForTaskCounterOfAtLeast(uint64_t taskCounter,
- uint64_t timeout = 5000) const;
-
- /** Call this to wait until no tasks are scheduled (Useful for testing) */
- void waitUntilNoTasksRemaining(uint64_t timeout = 5000) const;
-
-private:
- typedef std::vector<Task*> TaskVector;
- typedef std::map<Time, TaskVector> TaskMap;
- vespalib::Monitor _lock;
- Watch _defaultWatch;
- const Watch* _watch;
- TaskMap _tasks;
- TaskVector _currentRunningTasks;
- uint64_t _taskCounter;
-
- void run() override;
- bool onStop() override;
-};
-
-} // vdslib
-