diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-15 18:13:33 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-15 18:13:33 +0000 |
commit | 4cd704fd36d9016de6ed2caff9253847144a3330 (patch) | |
tree | 54006a5d9e4b2fb5a9c749ae27b729f8e4e19fe7 /searchcore | |
parent | c2a220bed85e7af09f62d34de339b168d9507b87 (diff) |
Use multiple threads for pruning the session caches.
Diffstat (limited to 'searchcore')
3 files changed, 58 insertions, 29 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp index 1e74d83185f..f00ce4ab00c 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp @@ -3,6 +3,9 @@ #include "sessionmanager.h" #include <vespa/vespalib/stllike/lrucache_map.hpp> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/foreground_thread_executor.h> #include <mutex> #include <algorithm> @@ -50,10 +53,6 @@ struct SessionCache : SessionCacheBase { } return ret; } - void pruneTimedOutSessions(vespalib::steady_time currentTime) { - std::vector<EntryUP> toDestruct = stealTimedOutSessions(currentTime); - toDestruct.clear(); - } std::vector<EntryUP> stealTimedOutSessions(vespalib::steady_time currentTime) { std::vector<EntryUP> toDestruct; std::lock_guard<std::mutex> guard(_lock); @@ -103,10 +102,6 @@ struct SessionMap : SessionCacheBase { } return EntrySP(); } - void pruneTimedOutSessions(vespalib::steady_time currentTime) { - std::vector<EntrySP> toDestruct = stealTimedOutSessions(currentTime); - toDestruct.clear(); - } std::vector<EntrySP> stealTimedOutSessions(vespalib::steady_time currentTime) { std::vector<EntrySP> toDestruct; std::vector<SessionId> keys; @@ -151,7 +146,8 @@ struct SessionMap : SessionCacheBase { } }; -void SessionCacheBase::entryDropped(const SessionId &id) { +void +SessionCacheBase::entryDropped(const SessionId &id) { LOG(debug, "Session cache is full, dropping entry to fit session '%s'", id.c_str()); _stats.numDropped++; } @@ -179,19 +175,23 @@ SessionManager::~SessionManager() { assert(_search_map->empty()); } -void SessionManager::insert(search::grouping::GroupingSession::UP session) { +void +SessionManager::insert(search::grouping::GroupingSession::UP session) { _grouping_cache->insert(std::move(session)); } -void SessionManager::insert(SearchSession::SP session) { +void +SessionManager::insert(SearchSession::SP session) { _search_map->insert(std::move(session)); } -GroupingSession::UP SessionManager::pickGrouping(const SessionId &id) { +GroupingSession::UP +SessionManager::pickGrouping(const SessionId &id) { return _grouping_cache->pick(id); } -SearchSession::SP SessionManager::pickSearch(const SessionId &id) { +SearchSession::SP +SessionManager::pickSearch(const SessionId &id) { return _search_map->pick(id); } @@ -199,33 +199,58 @@ std::vector<SessionManager::SearchSessionInfo> SessionManager::getSortedSearchSessionInfo() const { std::vector<SearchSessionInfo> sessions; - _search_map->each([&sessions](const SearchSession &session) - { - sessions.emplace_back(session.getSessionId(), - session.getCreateTime(), - session.getTimeOfDoom()); - }); + _search_map->each([&sessions](const SearchSession &session) { + sessions.emplace_back(session.getSessionId(), session.getCreateTime(), session.getTimeOfDoom()); + }); std::sort(sessions.begin(), sessions.end(), - [](const SearchSessionInfo &a, - const SearchSessionInfo &b) - { + [](const SearchSessionInfo &a, const SearchSessionInfo &b) { return (a.created < b.created); }); return sessions; } -void SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) { - _grouping_cache->pruneTimedOutSessions(currentTime); - _search_map->pruneTimedOutSessions(currentTime); +void +SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) { + vespalib::ForegroundThreadExecutor executor; + pruneTimedOutSessions(currentTime, executor); +} + +namespace { + +template <typename T> +void +split_and_execute(std::vector<T> tasks, vespalib::ThreadExecutor & executor) { + size_t num_bundles = std::min(tasks.size(), 2*executor.getNumThreads()); + std::vector<std::vector<T>> bundles(num_bundles); + for (size_t i = 0; i < tasks.size(); i++) { + bundles[i%bundles.size()].push_back(std::move(tasks[i])); + } + for (size_t i = 0; i < bundles.size(); i++) { + executor.execute(vespalib::makeLambdaTask([part=std::move(bundles[i])]() { + // Objects will be destructed in the given executor; + })); + } +} + +} +void +SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor) { + auto groupings = _grouping_cache->stealTimedOutSessions(currentTime); + auto queries = _search_map->stealTimedOutSessions(currentTime); + split_and_execute(std::move(groupings), executor); + split_and_execute(std::move(queries), executor); } -SessionManager::Stats SessionManager::getGroupingStats() { +SessionManager::Stats +SessionManager::getGroupingStats() { return _grouping_cache->getStats(); } -SessionManager::Stats SessionManager::getSearchStats() { +SessionManager::Stats +SessionManager::getSearchStats() { return _search_map->getStats(); } -size_t SessionManager::getNumSearchSessions() const { +size_t +SessionManager::getNumSearchSessions() const { return _search_map->size(); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h index 3ed4760b52e..5c881aeb6fd 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h @@ -6,6 +6,7 @@ #include <vespa/searchcore/grouping/sessionid.h> #include <vespa/vespalib/stllike/lrucache_map.h> +namespace vespalib { class ThreadExecutor; } namespace proton::matching { using SessionId = vespalib::string; @@ -59,6 +60,7 @@ public: std::vector<SearchSessionInfo> getSortedSearchSessionInfo() const; void pruneTimedOutSessions(vespalib::steady_time currentTime); + void pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor); }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 7f3dc02aba8..d70bff52ed4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -380,7 +380,9 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _flushEngine->start(); vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval); - _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval); + _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { + _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now(), _shared_service->shared()); + }), pruneSessionsInterval, pruneSessionsInterval); _isInitializing = false; _protonConfigurer.setAllowReconfig(true); _initComplete = true; |