summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-15 18:13:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2023-05-15 18:13:33 +0000
commit4cd704fd36d9016de6ed2caff9253847144a3330 (patch)
tree54006a5d9e4b2fb5a9c749ae27b729f8e4e19fe7 /searchcore
parentc2a220bed85e7af09f62d34de339b168d9507b87 (diff)
Use multiple threads for pruning the session caches.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp81
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp4
3 files changed, 58 insertions, 29 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp
index 1e74d83185f..f00ce4ab00c 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp
@@ -3,6 +3,9 @@
#include "sessionmanager.h"
#include <vespa/vespalib/stllike/lrucache_map.hpp>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/foreground_thread_executor.h>
#include <mutex>
#include <algorithm>
@@ -50,10 +53,6 @@ struct SessionCache : SessionCacheBase {
}
return ret;
}
- void pruneTimedOutSessions(vespalib::steady_time currentTime) {
- std::vector<EntryUP> toDestruct = stealTimedOutSessions(currentTime);
- toDestruct.clear();
- }
std::vector<EntryUP> stealTimedOutSessions(vespalib::steady_time currentTime) {
std::vector<EntryUP> toDestruct;
std::lock_guard<std::mutex> guard(_lock);
@@ -103,10 +102,6 @@ struct SessionMap : SessionCacheBase {
}
return EntrySP();
}
- void pruneTimedOutSessions(vespalib::steady_time currentTime) {
- std::vector<EntrySP> toDestruct = stealTimedOutSessions(currentTime);
- toDestruct.clear();
- }
std::vector<EntrySP> stealTimedOutSessions(vespalib::steady_time currentTime) {
std::vector<EntrySP> toDestruct;
std::vector<SessionId> keys;
@@ -151,7 +146,8 @@ struct SessionMap : SessionCacheBase {
}
};
-void SessionCacheBase::entryDropped(const SessionId &id) {
+void
+SessionCacheBase::entryDropped(const SessionId &id) {
LOG(debug, "Session cache is full, dropping entry to fit session '%s'", id.c_str());
_stats.numDropped++;
}
@@ -179,19 +175,23 @@ SessionManager::~SessionManager() {
assert(_search_map->empty());
}
-void SessionManager::insert(search::grouping::GroupingSession::UP session) {
+void
+SessionManager::insert(search::grouping::GroupingSession::UP session) {
_grouping_cache->insert(std::move(session));
}
-void SessionManager::insert(SearchSession::SP session) {
+void
+SessionManager::insert(SearchSession::SP session) {
_search_map->insert(std::move(session));
}
-GroupingSession::UP SessionManager::pickGrouping(const SessionId &id) {
+GroupingSession::UP
+SessionManager::pickGrouping(const SessionId &id) {
return _grouping_cache->pick(id);
}
-SearchSession::SP SessionManager::pickSearch(const SessionId &id) {
+SearchSession::SP
+SessionManager::pickSearch(const SessionId &id) {
return _search_map->pick(id);
}
@@ -199,33 +199,58 @@ std::vector<SessionManager::SearchSessionInfo>
SessionManager::getSortedSearchSessionInfo() const
{
std::vector<SearchSessionInfo> sessions;
- _search_map->each([&sessions](const SearchSession &session)
- {
- sessions.emplace_back(session.getSessionId(),
- session.getCreateTime(),
- session.getTimeOfDoom());
- });
+ _search_map->each([&sessions](const SearchSession &session) {
+ sessions.emplace_back(session.getSessionId(), session.getCreateTime(), session.getTimeOfDoom());
+ });
std::sort(sessions.begin(), sessions.end(),
- [](const SearchSessionInfo &a,
- const SearchSessionInfo &b)
- {
+ [](const SearchSessionInfo &a, const SearchSessionInfo &b) {
return (a.created < b.created);
});
return sessions;
}
-void SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) {
- _grouping_cache->pruneTimedOutSessions(currentTime);
- _search_map->pruneTimedOutSessions(currentTime);
+void
+SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) {
+ vespalib::ForegroundThreadExecutor executor;
+ pruneTimedOutSessions(currentTime, executor);
+}
+
+namespace {
+
+template <typename T>
+void
+split_and_execute(std::vector<T> tasks, vespalib::ThreadExecutor & executor) {
+ size_t num_bundles = std::min(tasks.size(), 2*executor.getNumThreads());
+ std::vector<std::vector<T>> bundles(num_bundles);
+ for (size_t i = 0; i < tasks.size(); i++) {
+ bundles[i%bundles.size()].push_back(std::move(tasks[i]));
+ }
+ for (size_t i = 0; i < bundles.size(); i++) {
+ executor.execute(vespalib::makeLambdaTask([part=std::move(bundles[i])]() {
+ // Objects will be destructed in the given executor;
+ }));
+ }
+}
+
+}
+void
+SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor) {
+ auto groupings = _grouping_cache->stealTimedOutSessions(currentTime);
+ auto queries = _search_map->stealTimedOutSessions(currentTime);
+ split_and_execute(std::move(groupings), executor);
+ split_and_execute(std::move(queries), executor);
}
-SessionManager::Stats SessionManager::getGroupingStats() {
+SessionManager::Stats
+SessionManager::getGroupingStats() {
return _grouping_cache->getStats();
}
-SessionManager::Stats SessionManager::getSearchStats() {
+SessionManager::Stats
+SessionManager::getSearchStats() {
return _search_map->getStats();
}
-size_t SessionManager::getNumSearchSessions() const {
+size_t
+SessionManager::getNumSearchSessions() const {
return _search_map->size();
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h
index 3ed4760b52e..5c881aeb6fd 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h
@@ -6,6 +6,7 @@
#include <vespa/searchcore/grouping/sessionid.h>
#include <vespa/vespalib/stllike/lrucache_map.h>
+namespace vespalib { class ThreadExecutor; }
namespace proton::matching {
using SessionId = vespalib::string;
@@ -59,6 +60,7 @@ public:
std::vector<SearchSessionInfo> getSortedSearchSessionInfo() const;
void pruneTimedOutSessions(vespalib::steady_time currentTime);
+ void pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor);
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 7f3dc02aba8..d70bff52ed4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -380,7 +380,9 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_flushEngine->start();
vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval);
- _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval);
+ _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() {
+ _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now(), _shared_service->shared());
+ }), pruneSessionsInterval, pruneSessionsInterval);
_isInitializing = false;
_protonConfigurer.setAllowReconfig(true);
_initComplete = true;