From 4cd704fd36d9016de6ed2caff9253847144a3330 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 15 May 2023 18:13:33 +0000 Subject: Use multiple threads for pruning the session caches. --- .../searchcore/proton/matching/sessionmanager.cpp | 81 ++++++++++++++-------- .../searchcore/proton/matching/sessionmanager.h | 2 + .../src/vespa/searchcore/proton/server/proton.cpp | 4 +- 3 files changed, 58 insertions(+), 29 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp index 1e74d83185f..f00ce4ab00c 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp @@ -3,6 +3,9 @@ #include "sessionmanager.h" #include #include +#include +#include +#include #include #include @@ -50,10 +53,6 @@ struct SessionCache : SessionCacheBase { } return ret; } - void pruneTimedOutSessions(vespalib::steady_time currentTime) { - std::vector toDestruct = stealTimedOutSessions(currentTime); - toDestruct.clear(); - } std::vector stealTimedOutSessions(vespalib::steady_time currentTime) { std::vector toDestruct; std::lock_guard guard(_lock); @@ -103,10 +102,6 @@ struct SessionMap : SessionCacheBase { } return EntrySP(); } - void pruneTimedOutSessions(vespalib::steady_time currentTime) { - std::vector toDestruct = stealTimedOutSessions(currentTime); - toDestruct.clear(); - } std::vector stealTimedOutSessions(vespalib::steady_time currentTime) { std::vector toDestruct; std::vector keys; @@ -151,7 +146,8 @@ struct SessionMap : SessionCacheBase { } }; -void SessionCacheBase::entryDropped(const SessionId &id) { +void +SessionCacheBase::entryDropped(const SessionId &id) { LOG(debug, "Session cache is full, dropping entry to fit session '%s'", id.c_str()); _stats.numDropped++; } @@ -179,19 +175,23 @@ SessionManager::~SessionManager() { assert(_search_map->empty()); } -void SessionManager::insert(search::grouping::GroupingSession::UP session) { +void +SessionManager::insert(search::grouping::GroupingSession::UP session) { _grouping_cache->insert(std::move(session)); } -void SessionManager::insert(SearchSession::SP session) { +void +SessionManager::insert(SearchSession::SP session) { _search_map->insert(std::move(session)); } -GroupingSession::UP SessionManager::pickGrouping(const SessionId &id) { +GroupingSession::UP +SessionManager::pickGrouping(const SessionId &id) { return _grouping_cache->pick(id); } -SearchSession::SP SessionManager::pickSearch(const SessionId &id) { +SearchSession::SP +SessionManager::pickSearch(const SessionId &id) { return _search_map->pick(id); } @@ -199,33 +199,58 @@ std::vector SessionManager::getSortedSearchSessionInfo() const { std::vector sessions; - _search_map->each([&sessions](const SearchSession &session) - { - sessions.emplace_back(session.getSessionId(), - session.getCreateTime(), - session.getTimeOfDoom()); - }); + _search_map->each([&sessions](const SearchSession &session) { + sessions.emplace_back(session.getSessionId(), session.getCreateTime(), session.getTimeOfDoom()); + }); std::sort(sessions.begin(), sessions.end(), - [](const SearchSessionInfo &a, - const SearchSessionInfo &b) - { + [](const SearchSessionInfo &a, const SearchSessionInfo &b) { return (a.created < b.created); }); return sessions; } -void SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) { - _grouping_cache->pruneTimedOutSessions(currentTime); - _search_map->pruneTimedOutSessions(currentTime); +void +SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) { + vespalib::ForegroundThreadExecutor executor; + pruneTimedOutSessions(currentTime, executor); +} + +namespace { + +template +void +split_and_execute(std::vector tasks, vespalib::ThreadExecutor & executor) { + size_t num_bundles = std::min(tasks.size(), 2*executor.getNumThreads()); + std::vector> bundles(num_bundles); + for (size_t i = 0; i < tasks.size(); i++) { + bundles[i%bundles.size()].push_back(std::move(tasks[i])); + } + for (size_t i = 0; i < bundles.size(); i++) { + executor.execute(vespalib::makeLambdaTask([part=std::move(bundles[i])]() { + // Objects will be destructed in the given executor; + })); + } +} + +} +void +SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor) { + auto groupings = _grouping_cache->stealTimedOutSessions(currentTime); + auto queries = _search_map->stealTimedOutSessions(currentTime); + split_and_execute(std::move(groupings), executor); + split_and_execute(std::move(queries), executor); } -SessionManager::Stats SessionManager::getGroupingStats() { +SessionManager::Stats +SessionManager::getGroupingStats() { return _grouping_cache->getStats(); } -SessionManager::Stats SessionManager::getSearchStats() { +SessionManager::Stats +SessionManager::getSearchStats() { return _search_map->getStats(); } -size_t SessionManager::getNumSearchSessions() const { +size_t +SessionManager::getNumSearchSessions() const { return _search_map->size(); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h index 3ed4760b52e..5c881aeb6fd 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h @@ -6,6 +6,7 @@ #include #include +namespace vespalib { class ThreadExecutor; } namespace proton::matching { using SessionId = vespalib::string; @@ -59,6 +60,7 @@ public: std::vector getSortedSearchSessionInfo() const; void pruneTimedOutSessions(vespalib::steady_time currentTime); + void pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor); }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 7f3dc02aba8..d70bff52ed4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -380,7 +380,9 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _flushEngine->start(); vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval); - _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval); + _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { + _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now(), _shared_service->shared()); + }), pruneSessionsInterval, pruneSessionsInterval); _isInitializing = false; _protonConfigurer.setAllowReconfig(true); _initComplete = true; -- cgit v1.2.3 From 85724a8604d82d13605af9974828a4c696b8c8ac Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 15 May 2023 20:57:53 +0000 Subject: GC unused include --- searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'searchcore') diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp index f00ce4ab00c..8b0af7fed88 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp @@ -3,7 +3,6 @@ #include "sessionmanager.h" #include #include -#include #include #include #include -- cgit v1.2.3 From e30e0749182c0e602d7f9cbe2497ec0fdc381f89 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 16 May 2023 07:08:32 +0000 Subject: Ensure that we get at least 1 bundle. --- .../src/vespa/searchcore/proton/matching/sessionmanager.cpp | 8 +++----- searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h | 3 ++- 2 files changed, 5 insertions(+), 6 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp index 8b0af7fed88..b78d638d702 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp @@ -219,7 +219,7 @@ namespace { template void split_and_execute(std::vector tasks, vespalib::ThreadExecutor & executor) { - size_t num_bundles = std::min(tasks.size(), 2*executor.getNumThreads()); + size_t num_bundles = std::max(1ul, std::min(tasks.size(), 2*executor.getNumThreads())); std::vector> bundles(num_bundles); for (size_t i = 0; i < tasks.size(); i++) { bundles[i%bundles.size()].push_back(std::move(tasks[i])); @@ -234,10 +234,8 @@ split_and_execute(std::vector tasks, vespalib::ThreadExecutor & executor) { } void SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor) { - auto groupings = _grouping_cache->stealTimedOutSessions(currentTime); - auto queries = _search_map->stealTimedOutSessions(currentTime); - split_and_execute(std::move(groupings), executor); - split_and_execute(std::move(queries), executor); + split_and_execute(_grouping_cache->stealTimedOutSessions(currentTime), executor); + split_and_execute(_search_map->stealTimedOutSessions(currentTime), executor); } SessionManager::Stats diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h index 5c881aeb6fd..872f1a90bd1 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h @@ -59,8 +59,9 @@ public: size_t getNumSearchSessions() const; std::vector getSortedSearchSessionInfo() const; - void pruneTimedOutSessions(vespalib::steady_time currentTime); void pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor); + // Only used for testing + void pruneTimedOutSessions(vespalib::steady_time currentTime); }; } -- cgit v1.2.3 From a9267052590830936fdbe80c54653507d5f2186c Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 16 May 2023 07:09:11 +0000 Subject: Use std::make_unique/make_shared instead of explicit new. --- searchcore/src/tests/grouping/grouping.cpp | 44 ++++++++++++++---------------- 1 file changed, 20 insertions(+), 24 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/tests/grouping/grouping.cpp b/searchcore/src/tests/grouping/grouping.cpp index f71912b7100..eabbaf3d50f 100644 --- a/searchcore/src/tests/grouping/grouping.cpp +++ b/searchcore/src/tests/grouping/grouping.cpp @@ -197,8 +197,8 @@ TEST_F("testGroupingContextUsage", DoomFixture()) { .addLevel(createGL(MU("attr3"), MU("attr1"))); - GroupingContext::GroupingPtr r1(new Grouping(request1)); - GroupingContext::GroupingPtr r2(new Grouping(request2)); + auto r1 = std::make_shared(request1); + auto r2 = std::make_shared(request2); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); ASSERT_TRUE(context.empty()); context.addGrouping(r1); @@ -222,7 +222,7 @@ TEST_F("testGroupingContextSerializing", DoomFixture()) { baseRequest.serialize(nos); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr bp(new Grouping(baseRequest)); + auto bp = std::make_shared(baseRequest); context.addGrouping(bp); context.serialize(); vespalib::nbostream & res(context.getResult()); @@ -240,7 +240,7 @@ TEST_F("testGroupingManager", DoomFixture()) { .addLevel(createGL(MU("attr2"), MU("attr3"))); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr bp(new Grouping(request1)); + auto bp = std::make_shared(request1); context.addGrouping(bp); GroupingManager manager(context); ASSERT_TRUE(!manager.empty()); @@ -272,8 +272,8 @@ TEST_F("testGroupingSession", DoomFixture()) { request2.select(attrCheck, attrCheck); EXPECT_EQUAL(0u, attrCheck._numrefs); - GroupingContext::GroupingPtr r1(new Grouping(request1)); - GroupingContext::GroupingPtr r2(new Grouping(request2)); + auto r1 = std::make_shared(request1); + auto r2 = std::make_shared(request2); GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom); initContext.addGrouping(r1); initContext.addGrouping(r2); @@ -307,7 +307,7 @@ TEST_F("testGroupingSession", DoomFixture()) { // Test second pass { GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr r(new Grouping(request1)); + auto r = std::make_shared(request1); r->setFirstLevel(1); r->setLastLevel(1); context.addGrouping(r); @@ -318,7 +318,7 @@ TEST_F("testGroupingSession", DoomFixture()) { // Test last pass. Session should be marked as finished { GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr r(new Grouping(request1)); + auto r = std::make_shared(request1); r->setFirstLevel(2); r->setLastLevel(2); context.addGrouping(r); @@ -340,7 +340,7 @@ TEST_F("testEmptySessionId", DoomFixture()) { .addLevel(createGL(MU("attr1"), MU("attr2"))) .addLevel(createGL(MU("attr2"), MU("attr3"))); - GroupingContext::GroupingPtr r1(new Grouping(request1)); + auto r1 = std::make_shared(request1); GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom); initContext.addGrouping(r1); SessionId id; @@ -373,7 +373,7 @@ TEST_F("testSessionManager", DoomFixture()) { .setExpression(MU("attr0")) .setResult(Int64ResultNode(0)))); - GroupingContext::GroupingPtr r1(new Grouping(request1)); + auto r1 = std::make_shared(request1); GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom); initContext.addGrouping(r1); @@ -381,9 +381,9 @@ TEST_F("testSessionManager", DoomFixture()) { SessionId id1("foo"); SessionId id2("bar"); SessionId id3("baz"); - GroupingSession::UP s1(new GroupingSession(id1, initContext, world.attributeContext)); - GroupingSession::UP s2(new GroupingSession(id2, initContext, world.attributeContext)); - GroupingSession::UP s3(new GroupingSession(id3, initContext, world.attributeContext)); + auto s1 = std::make_unique(id1, initContext, world.attributeContext); + auto s2 = std::make_unique(id2, initContext, world.attributeContext); + auto s3 = std::make_unique(id3, initContext, world.attributeContext); ASSERT_EQUAL(f1.timeOfDoom, s1->getTimeOfDoom()); mgr.insert(std::move(s1)); @@ -431,7 +431,7 @@ TEST_F("test grouping fork/join", DoomFixture()) { .setFirstLevel(0) .setLastLevel(1); - GroupingContext::GroupingPtr g1(new Grouping(request)); + auto g1 = std::make_shared(request); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); context.addGrouping(g1); GroupingSession session(SessionId(), context, world.attributeContext); @@ -476,24 +476,20 @@ TEST_F("test session timeout", DoomFixture()) { GroupingContext initContext1(f1.clock.clock(), steady_time(duration(10))); GroupingContext initContext2(f1.clock.clock(), steady_time(duration(20))); - GroupingSession::UP s1(new GroupingSession(id1, initContext1, world.attributeContext)); - GroupingSession::UP s2(new GroupingSession(id2, initContext2, world.attributeContext)); + auto s1 = std::make_unique(id1, initContext1, world.attributeContext); + auto s2 = std::make_unique(id2, initContext2, world.attributeContext); mgr.insert(std::move(s1)); mgr.insert(std::move(s2)); mgr.pruneTimedOutSessions(steady_time(5ns)); - SessionManager::Stats stats(mgr.getGroupingStats()); - ASSERT_EQUAL(2u, stats.numCached); + ASSERT_EQUAL(2u, mgr.getGroupingStats().numCached); mgr.pruneTimedOutSessions(steady_time(10ns)); - stats = mgr.getGroupingStats(); - ASSERT_EQUAL(2u, stats.numCached); + ASSERT_EQUAL(2u, mgr.getGroupingStats().numCached); mgr.pruneTimedOutSessions(steady_time(11ns)); - stats = mgr.getGroupingStats(); - ASSERT_EQUAL(1u, stats.numCached); + ASSERT_EQUAL(1u, mgr.getGroupingStats().numCached); mgr.pruneTimedOutSessions(steady_time(21ns)); - stats = mgr.getGroupingStats(); - ASSERT_EQUAL(0u, stats.numCached); + ASSERT_EQUAL(0u, mgr.getGroupingStats().numCached); } TEST_MAIN() { TEST_RUN_ALL(); } -- cgit v1.2.3