aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-02-21 18:59:31 +0100
committerGitHub <noreply@github.com>2022-02-21 18:59:31 +0100
commit9d83cff2851b2126700b1b3cf7b4080ac7c8fb81 (patch)
tree49b08e0b66fdec37bce82643b5c310db0cb41a7b /staging_vespalib/src
parenta7e8bb9dcf3c674a3756e0f0383384593856415a (diff)
Revert "Use a common FNET_Transport owned by Proton in both SceduledExecutor …"
Diffstat (limited to 'staging_vespalib/src')
-rw-r--r--staging_vespalib/src/tests/timer/timer_test.cpp40
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp29
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h11
3 files changed, 46 insertions, 34 deletions
diff --git a/staging_vespalib/src/tests/timer/timer_test.cpp b/staging_vespalib/src/tests/timer/timer_test.cpp
index 1f0ee81e4e6..9d04500b8cd 100644
--- a/staging_vespalib/src/tests/timer/timer_test.cpp
+++ b/staging_vespalib/src/tests/timer/timer_test.cpp
@@ -2,15 +2,18 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/scheduledexecutor.h>
-#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/fnet/transport.h>
-#include <vespa/fastos/thread.h>
using namespace vespalib;
using vespalib::Executor;
typedef Executor::Task Task;
-namespace {
+class Test : public TestApp
+{
+public:
+ int Main() override;
+ void testScheduling();
+ void testReset();
+};
class TestTask : public Task {
private:
@@ -20,36 +23,35 @@ public:
void run() override { _latch.countDown(); }
};
+int
+Test::Main()
+{
+ TEST_INIT("timer_test");
+ testScheduling();
+ testReset();
+ TEST_DONE();
}
-TEST("testScheduling") {
+void Test::testScheduling()
+{
vespalib::CountDownLatch latch1(3);
vespalib::CountDownLatch latch2(2);
- FastOS_ThreadPool threadPool(64_Ki);
- FNET_Transport transport;
- transport.Start(&threadPool);
- ScheduledExecutor timer(transport);
+ ScheduledExecutor timer;
timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms);
timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms);
EXPECT_TRUE(latch1.await(60s));
EXPECT_TRUE(latch2.await(60s));
- timer.reset();
- transport.ShutDown(true);
}
-TEST("testReset") {
+void Test::testReset()
+{
vespalib::CountDownLatch latch1(2);
- FastOS_ThreadPool threadPool(64_Ki);
- FNET_Transport transport;
- transport.Start(&threadPool);
- ScheduledExecutor timer(transport);
+ ScheduledExecutor timer;
timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
timer.reset();
EXPECT_TRUE(!latch1.await(3s));
timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
EXPECT_TRUE(latch1.await(60s));
- timer.reset();
- transport.ShutDown(true);
}
-TEST_MAIN() { TEST_RUN_ALL(); }
+TEST_APPHOOK(Test)
diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp
index 99254240f3c..f5384999cb8 100644
--- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp
@@ -3,10 +3,11 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/task.h>
#include <vespa/fnet/transport.h>
+#include <vespa/vespalib/util/size_literals.h>
namespace vespalib {
-using Task = vespalib::Executor::Task;
+typedef vespalib::Executor::Task Task;
class TimerTask : public FNET_Task
{
@@ -15,8 +16,8 @@ private:
TimerTask&operator=(const TimerTask &);
FNET_Scheduler *_scheduler;
- Task::UP _task;
- duration _interval;
+ Task::UP _task;
+ duration _interval;
public:
TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval)
: FNET_Task(scheduler),
@@ -34,15 +35,21 @@ public:
}
};
-ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport)
- : _transport(transport),
+ScheduledExecutor::ScheduledExecutor()
+ : _threadPool(128_Ki),
+ _transport(new FNET_Transport()),
_lock(),
_taskList()
-{ }
+{
+ _transport->Start(&_threadPool);
+}
ScheduledExecutor::~ScheduledExecutor()
{
- reset();
+ std::lock_guard guard(_lock);
+ _transport->ShutDown(true);
+ _threadPool.Close();
+ _taskList.clear();
}
@@ -50,7 +57,7 @@ void
ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval)
{
std::lock_guard guard(_lock);
- auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval);
+ TimerTaskPtr tTask(new TimerTask(_transport->GetScheduler(), std::move(task), interval));
_taskList.push_back(std::move(tTask));
_taskList.back()->Schedule(to_s(delay));
}
@@ -59,10 +66,10 @@ void
ScheduledExecutor::reset()
{
std::lock_guard guard(_lock);
- for (auto & task : _taskList) {
- task->Unschedule();
- }
+ _transport->ShutDown(true);
_taskList.clear();
+ _transport = std::make_unique<FNET_Transport>();
+ _transport->Start(&_threadPool);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h
index 1be49f6ff45..dcc16a8c379 100644
--- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h
@@ -3,6 +3,7 @@
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/fastos/thread.h>
#include <vector>
class FNET_Transport;
@@ -19,8 +20,10 @@ class TimerTask;
class ScheduledExecutor
{
private:
- using TaskList = std::vector<std::unique_ptr<TimerTask>>;
- FNET_Transport & _transport;
+ typedef std::unique_ptr<TimerTask> TimerTaskPtr;
+ typedef std::vector<TimerTaskPtr> TaskList;
+ FastOS_ThreadPool _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
std::mutex _lock;
TaskList _taskList;
@@ -28,7 +31,7 @@ public:
/**
* Create a new timer, capable of scheduling tasks at fixed intervals.
*/
- ScheduledExecutor(FNET_Transport & transport);
+ ScheduledExecutor();
/**
* Destroys this timer, finishing the current task executing and then
@@ -43,7 +46,7 @@ public:
* @param delay The delay to wait before first execution.
* @param interval The interval in seconds.
*/
- void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval);
+ void scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval);
/**
* Reset timer, clearing the list of task to execute.