summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-16 12:02:11 +0200
committerGitHub <noreply@github.com>2023-05-16 12:02:11 +0200
commit2a98928849c14a88a6c0e71b03965da393893edc (patch)
treebb65663c39d983794bdfba341dfb4bd54bcb20d6 /searchcore
parent511629d53ffbbbe93f2413573ca43cb3d766a2b4 (diff)
parenta9267052590830936fdbe80c54653507d5f2186c (diff)
Merge pull request #27123 from vespa-engine/balder/use-multiple-threads-for-pruning
Use multiple threads for pruning the session caches.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/grouping/grouping.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp78
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp4
4 files changed, 76 insertions, 53 deletions
diff --git a/searchcore/src/tests/grouping/grouping.cpp b/searchcore/src/tests/grouping/grouping.cpp
index f71912b7100..eabbaf3d50f 100644
--- a/searchcore/src/tests/grouping/grouping.cpp
+++ b/searchcore/src/tests/grouping/grouping.cpp
@@ -197,8 +197,8 @@ TEST_F("testGroupingContextUsage", DoomFixture()) {
.addLevel(createGL(MU<AttributeNode>("attr3"), MU<AttributeNode>("attr1")));
- GroupingContext::GroupingPtr r1(new Grouping(request1));
- GroupingContext::GroupingPtr r2(new Grouping(request2));
+ auto r1 = std::make_shared<Grouping>(request1);
+ auto r2 = std::make_shared<Grouping>(request2);
GroupingContext context(f1.clock.clock(), f1.timeOfDoom);
ASSERT_TRUE(context.empty());
context.addGrouping(r1);
@@ -222,7 +222,7 @@ TEST_F("testGroupingContextSerializing", DoomFixture()) {
baseRequest.serialize(nos);
GroupingContext context(f1.clock.clock(), f1.timeOfDoom);
- GroupingContext::GroupingPtr bp(new Grouping(baseRequest));
+ auto bp = std::make_shared<Grouping>(baseRequest);
context.addGrouping(bp);
context.serialize();
vespalib::nbostream & res(context.getResult());
@@ -240,7 +240,7 @@ TEST_F("testGroupingManager", DoomFixture()) {
.addLevel(createGL(MU<AttributeNode>("attr2"), MU<AttributeNode>("attr3")));
GroupingContext context(f1.clock.clock(), f1.timeOfDoom);
- GroupingContext::GroupingPtr bp(new Grouping(request1));
+ auto bp = std::make_shared<Grouping>(request1);
context.addGrouping(bp);
GroupingManager manager(context);
ASSERT_TRUE(!manager.empty());
@@ -272,8 +272,8 @@ TEST_F("testGroupingSession", DoomFixture()) {
request2.select(attrCheck, attrCheck);
EXPECT_EQUAL(0u, attrCheck._numrefs);
- GroupingContext::GroupingPtr r1(new Grouping(request1));
- GroupingContext::GroupingPtr r2(new Grouping(request2));
+ auto r1 = std::make_shared<Grouping>(request1);
+ auto r2 = std::make_shared<Grouping>(request2);
GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom);
initContext.addGrouping(r1);
initContext.addGrouping(r2);
@@ -307,7 +307,7 @@ TEST_F("testGroupingSession", DoomFixture()) {
// Test second pass
{
GroupingContext context(f1.clock.clock(), f1.timeOfDoom);
- GroupingContext::GroupingPtr r(new Grouping(request1));
+ auto r = std::make_shared<Grouping>(request1);
r->setFirstLevel(1);
r->setLastLevel(1);
context.addGrouping(r);
@@ -318,7 +318,7 @@ TEST_F("testGroupingSession", DoomFixture()) {
// Test last pass. Session should be marked as finished
{
GroupingContext context(f1.clock.clock(), f1.timeOfDoom);
- GroupingContext::GroupingPtr r(new Grouping(request1));
+ auto r = std::make_shared<Grouping>(request1);
r->setFirstLevel(2);
r->setLastLevel(2);
context.addGrouping(r);
@@ -340,7 +340,7 @@ TEST_F("testEmptySessionId", DoomFixture()) {
.addLevel(createGL(MU<AttributeNode>("attr1"), MU<AttributeNode>("attr2")))
.addLevel(createGL(MU<AttributeNode>("attr2"), MU<AttributeNode>("attr3")));
- GroupingContext::GroupingPtr r1(new Grouping(request1));
+ auto r1 = std::make_shared<Grouping>(request1);
GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom);
initContext.addGrouping(r1);
SessionId id;
@@ -373,7 +373,7 @@ TEST_F("testSessionManager", DoomFixture()) {
.setExpression(MU<AttributeNode>("attr0"))
.setResult(Int64ResultNode(0))));
- GroupingContext::GroupingPtr r1(new Grouping(request1));
+ auto r1 = std::make_shared<Grouping>(request1);
GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom);
initContext.addGrouping(r1);
@@ -381,9 +381,9 @@ TEST_F("testSessionManager", DoomFixture()) {
SessionId id1("foo");
SessionId id2("bar");
SessionId id3("baz");
- GroupingSession::UP s1(new GroupingSession(id1, initContext, world.attributeContext));
- GroupingSession::UP s2(new GroupingSession(id2, initContext, world.attributeContext));
- GroupingSession::UP s3(new GroupingSession(id3, initContext, world.attributeContext));
+ auto s1 = std::make_unique<GroupingSession>(id1, initContext, world.attributeContext);
+ auto s2 = std::make_unique<GroupingSession>(id2, initContext, world.attributeContext);
+ auto s3 = std::make_unique<GroupingSession>(id3, initContext, world.attributeContext);
ASSERT_EQUAL(f1.timeOfDoom, s1->getTimeOfDoom());
mgr.insert(std::move(s1));
@@ -431,7 +431,7 @@ TEST_F("test grouping fork/join", DoomFixture()) {
.setFirstLevel(0)
.setLastLevel(1);
- GroupingContext::GroupingPtr g1(new Grouping(request));
+ auto g1 = std::make_shared<Grouping>(request);
GroupingContext context(f1.clock.clock(), f1.timeOfDoom);
context.addGrouping(g1);
GroupingSession session(SessionId(), context, world.attributeContext);
@@ -476,24 +476,20 @@ TEST_F("test session timeout", DoomFixture()) {
GroupingContext initContext1(f1.clock.clock(), steady_time(duration(10)));
GroupingContext initContext2(f1.clock.clock(), steady_time(duration(20)));
- GroupingSession::UP s1(new GroupingSession(id1, initContext1, world.attributeContext));
- GroupingSession::UP s2(new GroupingSession(id2, initContext2, world.attributeContext));
+ auto s1 = std::make_unique<GroupingSession>(id1, initContext1, world.attributeContext);
+ auto s2 = std::make_unique<GroupingSession>(id2, initContext2, world.attributeContext);
mgr.insert(std::move(s1));
mgr.insert(std::move(s2));
mgr.pruneTimedOutSessions(steady_time(5ns));
- SessionManager::Stats stats(mgr.getGroupingStats());
- ASSERT_EQUAL(2u, stats.numCached);
+ ASSERT_EQUAL(2u, mgr.getGroupingStats().numCached);
mgr.pruneTimedOutSessions(steady_time(10ns));
- stats = mgr.getGroupingStats();
- ASSERT_EQUAL(2u, stats.numCached);
+ ASSERT_EQUAL(2u, mgr.getGroupingStats().numCached);
mgr.pruneTimedOutSessions(steady_time(11ns));
- stats = mgr.getGroupingStats();
- ASSERT_EQUAL(1u, stats.numCached);
+ ASSERT_EQUAL(1u, mgr.getGroupingStats().numCached);
mgr.pruneTimedOutSessions(steady_time(21ns));
- stats = mgr.getGroupingStats();
- ASSERT_EQUAL(0u, stats.numCached);
+ ASSERT_EQUAL(0u, mgr.getGroupingStats().numCached);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp
index 1e74d83185f..b78d638d702 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp
@@ -3,6 +3,8 @@
#include "sessionmanager.h"
#include <vespa/vespalib/stllike/lrucache_map.hpp>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/foreground_thread_executor.h>
#include <mutex>
#include <algorithm>
@@ -50,10 +52,6 @@ struct SessionCache : SessionCacheBase {
}
return ret;
}
- void pruneTimedOutSessions(vespalib::steady_time currentTime) {
- std::vector<EntryUP> toDestruct = stealTimedOutSessions(currentTime);
- toDestruct.clear();
- }
std::vector<EntryUP> stealTimedOutSessions(vespalib::steady_time currentTime) {
std::vector<EntryUP> toDestruct;
std::lock_guard<std::mutex> guard(_lock);
@@ -103,10 +101,6 @@ struct SessionMap : SessionCacheBase {
}
return EntrySP();
}
- void pruneTimedOutSessions(vespalib::steady_time currentTime) {
- std::vector<EntrySP> toDestruct = stealTimedOutSessions(currentTime);
- toDestruct.clear();
- }
std::vector<EntrySP> stealTimedOutSessions(vespalib::steady_time currentTime) {
std::vector<EntrySP> toDestruct;
std::vector<SessionId> keys;
@@ -151,7 +145,8 @@ struct SessionMap : SessionCacheBase {
}
};
-void SessionCacheBase::entryDropped(const SessionId &id) {
+void
+SessionCacheBase::entryDropped(const SessionId &id) {
LOG(debug, "Session cache is full, dropping entry to fit session '%s'", id.c_str());
_stats.numDropped++;
}
@@ -179,19 +174,23 @@ SessionManager::~SessionManager() {
assert(_search_map->empty());
}
-void SessionManager::insert(search::grouping::GroupingSession::UP session) {
+void
+SessionManager::insert(search::grouping::GroupingSession::UP session) {
_grouping_cache->insert(std::move(session));
}
-void SessionManager::insert(SearchSession::SP session) {
+void
+SessionManager::insert(SearchSession::SP session) {
_search_map->insert(std::move(session));
}
-GroupingSession::UP SessionManager::pickGrouping(const SessionId &id) {
+GroupingSession::UP
+SessionManager::pickGrouping(const SessionId &id) {
return _grouping_cache->pick(id);
}
-SearchSession::SP SessionManager::pickSearch(const SessionId &id) {
+SearchSession::SP
+SessionManager::pickSearch(const SessionId &id) {
return _search_map->pick(id);
}
@@ -199,33 +198,56 @@ std::vector<SessionManager::SearchSessionInfo>
SessionManager::getSortedSearchSessionInfo() const
{
std::vector<SearchSessionInfo> sessions;
- _search_map->each([&sessions](const SearchSession &session)
- {
- sessions.emplace_back(session.getSessionId(),
- session.getCreateTime(),
- session.getTimeOfDoom());
- });
+ _search_map->each([&sessions](const SearchSession &session) {
+ sessions.emplace_back(session.getSessionId(), session.getCreateTime(), session.getTimeOfDoom());
+ });
std::sort(sessions.begin(), sessions.end(),
- [](const SearchSessionInfo &a,
- const SearchSessionInfo &b)
- {
+ [](const SearchSessionInfo &a, const SearchSessionInfo &b) {
return (a.created < b.created);
});
return sessions;
}
-void SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) {
- _grouping_cache->pruneTimedOutSessions(currentTime);
- _search_map->pruneTimedOutSessions(currentTime);
+void
+SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) {
+ vespalib::ForegroundThreadExecutor executor;
+ pruneTimedOutSessions(currentTime, executor);
+}
+
+namespace {
+
+template <typename T>
+void
+split_and_execute(std::vector<T> tasks, vespalib::ThreadExecutor & executor) {
+ size_t num_bundles = std::max(1ul, std::min(tasks.size(), 2*executor.getNumThreads()));
+ std::vector<std::vector<T>> bundles(num_bundles);
+ for (size_t i = 0; i < tasks.size(); i++) {
+ bundles[i%bundles.size()].push_back(std::move(tasks[i]));
+ }
+ for (size_t i = 0; i < bundles.size(); i++) {
+ executor.execute(vespalib::makeLambdaTask([part=std::move(bundles[i])]() {
+ // Objects will be destructed in the given executor;
+ }));
+ }
+}
+
+}
+void
+SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor) {
+ split_and_execute(_grouping_cache->stealTimedOutSessions(currentTime), executor);
+ split_and_execute(_search_map->stealTimedOutSessions(currentTime), executor);
}
-SessionManager::Stats SessionManager::getGroupingStats() {
+SessionManager::Stats
+SessionManager::getGroupingStats() {
return _grouping_cache->getStats();
}
-SessionManager::Stats SessionManager::getSearchStats() {
+SessionManager::Stats
+SessionManager::getSearchStats() {
return _search_map->getStats();
}
-size_t SessionManager::getNumSearchSessions() const {
+size_t
+SessionManager::getNumSearchSessions() const {
return _search_map->size();
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h
index 3ed4760b52e..872f1a90bd1 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h
@@ -6,6 +6,7 @@
#include <vespa/searchcore/grouping/sessionid.h>
#include <vespa/vespalib/stllike/lrucache_map.h>
+namespace vespalib { class ThreadExecutor; }
namespace proton::matching {
using SessionId = vespalib::string;
@@ -58,6 +59,8 @@ public:
size_t getNumSearchSessions() const;
std::vector<SearchSessionInfo> getSortedSearchSessionInfo() const;
+ void pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor);
+ // Only used for testing
void pruneTimedOutSessions(vespalib::steady_time currentTime);
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 7f3dc02aba8..d70bff52ed4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -380,7 +380,9 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_flushEngine->start();
vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval);
- _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval);
+ _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() {
+ _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now(), _shared_service->shared());
+ }), pruneSessionsInterval, pruneSessionsInterval);
_isInitializing = false;
_protonConfigurer.setAllowReconfig(true);
_initComplete = true;