summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-12-15 11:05:00 +0000
committerGeir Storli <geirst@verizonmedia.com>2020-12-15 11:06:47 +0000
commitf31f8dffef680bef44e56d2c5e22900533ab272f (patch)
treedc72fbe4de83f8fbc235bda85a24f244e80a1ea8
parent6f479ad61a4a6d973ce6a985e25206e9adfdcfee (diff)
Add explorer for the ExecutorThreadingService used in a document database.
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp122
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h25
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h3
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h1
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp9
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h4
-rw-r--r--vespalib/src/vespa/vespalib/util/threadexecutor.h5
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h2
16 files changed, 206 insertions, 13 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 57445775df3..81dd8a64a6c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -36,6 +36,7 @@ vespa_add_library(searchcore_server STATIC
documentsubdbcollection.cpp
emptysearchview.cpp
executor_thread_service.cpp
+ executor_threading_service_explorer.cpp
executorthreadingservice.cpp
fast_access_document_retriever.cpp
fast_access_doc_subdb.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp
index 0c3772cab5b..a5c3e8d6078 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp
@@ -1,12 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "document_db_explorer.h"
-
#include "document_meta_store_read_guards.h"
#include "document_subdb_collection_explorer.h"
+#include "executor_threading_service_explorer.h"
#include "maintenance_controller_explorer.h"
-#include <vespa/searchcore/proton/common/state_reporter_utils.h>
#include <vespa/searchcore/proton/bucketdb/bucket_db_explorer.h>
+#include <vespa/searchcore/proton/common/state_reporter_utils.h>
#include <vespa/searchcore/proton/matching/session_manager_explorer.h>
#include <vespa/vespalib/data/slime/slime.h>
@@ -42,6 +42,7 @@ DocumentDBExplorer::get_state(const Inserter &inserter, bool full) const
}
const vespalib::string SUB_DB = "subdb";
+const vespalib::string THREADING_SERVICE = "threadingservice";
const vespalib::string BUCKET_DB = "bucketdb";
const vespalib::string MAINTENANCE_CONTROLLER = "maintenancecontroller";
const vespalib::string SESSION = "session";
@@ -49,25 +50,24 @@ const vespalib::string SESSION = "session";
std::vector<vespalib::string>
DocumentDBExplorer::get_children_names() const
{
- return {SUB_DB, BUCKET_DB, MAINTENANCE_CONTROLLER, SESSION};
+ return {SUB_DB, THREADING_SERVICE, BUCKET_DB, MAINTENANCE_CONTROLLER, SESSION};
}
std::unique_ptr<StateExplorer>
DocumentDBExplorer::get_child(vespalib::stringref name) const
{
if (name == SUB_DB) {
- return std::unique_ptr<StateExplorer>
- (new DocumentSubDBCollectionExplorer(_docDb->getDocumentSubDBs()));
+ return std::make_unique<DocumentSubDBCollectionExplorer>(_docDb->getDocumentSubDBs());
+ } else if (name == THREADING_SERVICE) {
+ return std::make_unique<ExecutorThreadingServiceExplorer>(_docDb->getWriteService());
} else if (name == BUCKET_DB) {
// TODO(geirst): const_cast can be avoided if we add const guard to BucketDBOwner.
- return std::unique_ptr<StateExplorer>(new BucketDBExplorer(
- (const_cast<DocumentSubDBCollection &>(_docDb->getDocumentSubDBs())).getBucketDB().takeGuard()));
+ return std::make_unique<BucketDBExplorer>(
+ (const_cast<DocumentSubDBCollection &>(_docDb->getDocumentSubDBs())).getBucketDB().takeGuard());
} else if (name == MAINTENANCE_CONTROLLER) {
- return std::unique_ptr<StateExplorer>
- (new MaintenanceControllerExplorer(_docDb->getMaintenanceController().getJobList()));
+ return std::make_unique<MaintenanceControllerExplorer>(_docDb->getMaintenanceController().getJobList());
} else if (name == SESSION) {
- return std::unique_ptr<StateExplorer>
- (new matching::SessionManagerExplorer(_docDb->session_manager()));
+ return std::make_unique<matching::SessionManagerExplorer>(_docDb->session_manager());
}
return std::unique_ptr<StateExplorer>(nullptr);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
index 24f5fa9a5e6..49dc038096c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
@@ -81,6 +81,10 @@ void ExecutorThreadService::setTaskLimit(uint32_t taskLimit) {
_executor.setTaskLimit(taskLimit);
}
+uint32_t ExecutorThreadService::getTaskLimit() const {
+ return _executor.getTaskLimit();
+}
+
void
ExecutorThreadService::wakeup() {
_executor.wakeup();
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
index 4f27a8f86c2..325c36b7270 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
@@ -39,6 +39,7 @@ public:
size_t getNumThreads() const override { return _executor.getNumThreads(); }
void setTaskLimit(uint32_t taskLimit) override;
+ uint32_t getTaskLimit() const override;
void wakeup() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp
new file mode 100644
index 00000000000..0ecdca54e27
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp
@@ -0,0 +1,122 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "executor_threading_service_explorer.h"
+#include "executorthreadingservice.h"
+#include <vespa/vespalib/data/slime/cursor.h>
+#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+
+using vespalib::AdaptiveSequencedExecutor;
+using vespalib::BlockingThreadStackExecutor;
+using vespalib::ISequencedTaskExecutor;
+using vespalib::SequencedTaskExecutor;
+using vespalib::SingleExecutor;
+using vespalib::SyncableThreadExecutor;
+using vespalib::ThreadStackExecutor;
+using vespalib::slime::Cursor;
+
+namespace proton {
+
+namespace {
+
+void
+set_type(Cursor& object, const vespalib::string& type)
+{
+ object.setString("type", type);
+}
+
+void
+convert_syncable_executor_to_slime(const SyncableThreadExecutor& executor, const vespalib::string& type, Cursor& object)
+{
+ set_type(object, type);
+ object.setLong("num_threads", executor.getNumThreads());
+ object.setLong("task_limit", executor.getTaskLimit());
+}
+
+void
+convert_single_executor_to_slime(const SingleExecutor& executor, Cursor& object)
+{
+ convert_syncable_executor_to_slime(executor, "SingleExecutor", object);
+ object.setLong("watermark", executor.get_watermark());
+ object.setDouble("reaction_time_sec", vespalib::to_s(executor.get_reaction_time()));
+}
+
+void
+convert_executor_to_slime(const SyncableThreadExecutor* executor, Cursor& object)
+{
+ if (executor == nullptr) {
+ return;
+ }
+ if (const auto* single = dynamic_cast<const SingleExecutor*>(executor)) {
+ convert_single_executor_to_slime(*single, object);
+ } else if (const auto* blocking = dynamic_cast<const BlockingThreadStackExecutor*>(executor)) {
+ convert_syncable_executor_to_slime(*blocking, "BlockingThreadStackExecutor", object);
+ } else if (const auto* thread = dynamic_cast<const ThreadStackExecutor*>(executor)) {
+ convert_syncable_executor_to_slime(*thread, "ThreadStackExecutor", object);
+ } else {
+ convert_syncable_executor_to_slime(*executor, "SyncableThreadExecutor", object);
+ }
+}
+
+void
+convert_sequenced_executor_to_slime(const SequencedTaskExecutor& executor, Cursor& object)
+{
+ set_type(object, "SequencedTaskExecutor");
+ object.setLong("num_executors", executor.getNumExecutors());
+ convert_executor_to_slime(executor.first_executor(), object.setObject("executor"));
+}
+
+void
+convert_adaptive_executor_to_slime(const AdaptiveSequencedExecutor& executor, Cursor& object)
+{
+ set_type(object, "AdaptiveSequencedExecutor");
+ object.setLong("num_strands", executor.getNumExecutors());
+ auto cfg = executor.get_config();
+ object.setLong("num_threads", cfg.num_threads);
+ object.setLong("max_waiting", cfg.max_waiting);
+ object.setLong("max_pending", cfg.max_pending);
+ object.setLong("wakeup_limit", cfg.wakeup_limit);
+}
+
+void
+convert_executor_to_slime(const ISequencedTaskExecutor* executor, Cursor& object)
+{
+ if (const auto* seq = dynamic_cast<const SequencedTaskExecutor*>(executor)) {
+ convert_sequenced_executor_to_slime(*seq, object);
+ } else if (const auto* ada = dynamic_cast<const AdaptiveSequencedExecutor*>(executor)) {
+ convert_adaptive_executor_to_slime(*ada, object);
+ } else {
+ set_type(object, "ISequencedTaskExecutor");
+ object.setLong("num_executors", executor->getNumExecutors());
+ }
+}
+
+}
+
+ExecutorThreadingServiceExplorer::ExecutorThreadingServiceExplorer(ExecutorThreadingService& service)
+ : _service(service)
+{
+}
+
+ExecutorThreadingServiceExplorer::~ExecutorThreadingServiceExplorer() = default;
+
+void
+ExecutorThreadingServiceExplorer::get_state(const vespalib::slime::Inserter& inserter, bool full) const
+{
+ auto& object = inserter.insertObject();
+ if (full) {
+ convert_executor_to_slime(&_service.getMasterExecutor(), object.setObject("master"));
+ convert_executor_to_slime(&_service.getIndexExecutor(), object.setObject("index"));
+ convert_executor_to_slime(&_service.getSummaryExecutor(), object.setObject("summary"));
+ convert_executor_to_slime(&_service.indexFieldInverter(), object.setObject("index_field_inverter"));
+ convert_executor_to_slime(&_service.indexFieldWriter(), object.setObject("index_field_writer"));
+ convert_executor_to_slime(&_service.attributeFieldWriter(), object.setObject("attribute_field_writer"));
+ }
+}
+
+}
+
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h
new file mode 100644
index 00000000000..14d25add8d6
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h
@@ -0,0 +1,25 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/net/state_explorer.h>
+
+namespace proton {
+
+class ExecutorThreadingService;
+
+/**
+ * Class used to explore the state of the ExecutorThreadingService used in a document database.
+ */
+class ExecutorThreadingServiceExplorer : public vespalib::StateExplorer {
+private:
+ ExecutorThreadingService& _service;
+
+public:
+ ExecutorThreadingServiceExplorer(ExecutorThreadingService& service);
+ ~ExecutorThreadingServiceExplorer();
+
+ void get_state(const vespalib::slime::Inserter& inserter, bool full) const override;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
index 350673b9c90..0b1b9730363 100644
--- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
@@ -48,6 +48,10 @@ public:
_service.setTaskLimit(taskLimit);
}
+ uint32_t getTaskLimit() const override {
+ return _service.getTaskLimit();
+ }
+
void wakeup() override {
_service.wakeup();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 407129199e3..e4b64b19739 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -334,4 +334,11 @@ AdaptiveSequencedExecutor::getStats()
return stats;
}
+AdaptiveSequencedExecutor::Config
+AdaptiveSequencedExecutor::get_config() const
+{
+ auto guard = std::lock_guard(_mutex);
+ return _cfg;
+}
+
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index c52b9b22245..ed2209d130a 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -123,7 +123,7 @@ private:
};
std::unique_ptr<ThreadTools> _thread_tools;
- std::mutex _mutex;
+ mutable std::mutex _mutex;
std::vector<Strand> _strands;
vespalib::ArrayQueue<Strand*> _wait_queue;
vespalib::ArrayQueue<Worker*> _worker_stack;
@@ -149,6 +149,7 @@ public:
void sync() override;
void setTaskLimit(uint32_t task_limit) override;
vespalib::ExecutorStats getStats() override;
+ Config get_config() const;
};
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
index ff523b1e35d..1862de910ab 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -26,6 +26,7 @@ public:
return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
}
void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
+ uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); }
void wakeup() override { }
};
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index d1c6b1aba53..b0c67e14c5b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -123,4 +123,13 @@ SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
return ExecutorId(executorId);
}
+const vespalib::SyncableThreadExecutor*
+SequencedTaskExecutor::first_executor() const
+{
+ if (_executors->empty()) {
+ return nullptr;
+ }
+ return _executors->front().get();
+}
+
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 050b00ef011..496b183af8a 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -41,6 +41,8 @@ public:
*/
uint32_t getComponentHashSize() const { return _component2Id.size(); }
uint32_t getComponentEffectiveHashSize() const { return _nextId; }
+ const vespalib::SyncableThreadExecutor* first_executor() const;
+
private:
explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 58cec52b2b0..721df3bf881 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -26,7 +26,9 @@ public:
SingleExecutor & sync() override;
void wakeup() override;
size_t getNumThreads() const override;
- uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
+ uint32_t getTaskLimit() const override { return _taskLimit.load(std::memory_order_relaxed); }
+ uint32_t get_watermark() const { return _watermark; }
+ duration get_reaction_time() const { return _reactionTime; }
Stats getStats() override;
SingleExecutor & shutdown() override;
private:
diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h
index 61a5d9d5ac7..7d06007d7be 100644
--- a/vespalib/src/vespa/vespalib/util/threadexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h
@@ -32,6 +32,11 @@ public:
* Sets a new upper limit for accepted number of tasks.
*/
virtual void setTaskLimit(uint32_t taskLimit) = 0;
+
+ /**
+ * Gets the limit for accepted number of tasks.
+ */
+ virtual uint32_t getTaskLimit() const = 0;
};
/**
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 28ee2da2d8c..01546b80d66 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -174,6 +174,13 @@ ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit)
internalSetTaskLimit(taskLimit);
}
+uint32_t
+ThreadStackExecutorBase::getTaskLimit() const
+{
+ unique_lock guard(_lock);
+ return _taskLimit;
+}
+
void
ThreadStackExecutorBase::wakeup() {
// Nothing to do here as workers are always attentive.
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index c86597ca153..c0653c19516 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -211,6 +211,8 @@ public:
size_t getNumThreads() const override;
void setTaskLimit(uint32_t taskLimit) override;
+ uint32_t getTaskLimit() const override;
+
void wakeup() override;
/**