diff options
Diffstat (limited to 'searchcore')
4 files changed, 76 insertions, 53 deletions
diff --git a/searchcore/src/tests/grouping/grouping.cpp b/searchcore/src/tests/grouping/grouping.cpp index f71912b7100..eabbaf3d50f 100644 --- a/searchcore/src/tests/grouping/grouping.cpp +++ b/searchcore/src/tests/grouping/grouping.cpp @@ -197,8 +197,8 @@ TEST_F("testGroupingContextUsage", DoomFixture()) { .addLevel(createGL(MU<AttributeNode>("attr3"), MU<AttributeNode>("attr1"))); - GroupingContext::GroupingPtr r1(new Grouping(request1)); - GroupingContext::GroupingPtr r2(new Grouping(request2)); + auto r1 = std::make_shared<Grouping>(request1); + auto r2 = std::make_shared<Grouping>(request2); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); ASSERT_TRUE(context.empty()); context.addGrouping(r1); @@ -222,7 +222,7 @@ TEST_F("testGroupingContextSerializing", DoomFixture()) { baseRequest.serialize(nos); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr bp(new Grouping(baseRequest)); + auto bp = std::make_shared<Grouping>(baseRequest); context.addGrouping(bp); context.serialize(); vespalib::nbostream & res(context.getResult()); @@ -240,7 +240,7 @@ TEST_F("testGroupingManager", DoomFixture()) { .addLevel(createGL(MU<AttributeNode>("attr2"), MU<AttributeNode>("attr3"))); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr bp(new Grouping(request1)); + auto bp = std::make_shared<Grouping>(request1); context.addGrouping(bp); GroupingManager manager(context); ASSERT_TRUE(!manager.empty()); @@ -272,8 +272,8 @@ TEST_F("testGroupingSession", DoomFixture()) { request2.select(attrCheck, attrCheck); EXPECT_EQUAL(0u, attrCheck._numrefs); - GroupingContext::GroupingPtr r1(new Grouping(request1)); - GroupingContext::GroupingPtr r2(new Grouping(request2)); + auto r1 = std::make_shared<Grouping>(request1); + auto r2 = std::make_shared<Grouping>(request2); GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom); initContext.addGrouping(r1); initContext.addGrouping(r2); @@ -307,7 +307,7 @@ TEST_F("testGroupingSession", DoomFixture()) { // Test second pass { GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr r(new Grouping(request1)); + auto r = std::make_shared<Grouping>(request1); r->setFirstLevel(1); r->setLastLevel(1); context.addGrouping(r); @@ -318,7 +318,7 @@ TEST_F("testGroupingSession", DoomFixture()) { // Test last pass. Session should be marked as finished { GroupingContext context(f1.clock.clock(), f1.timeOfDoom); - GroupingContext::GroupingPtr r(new Grouping(request1)); + auto r = std::make_shared<Grouping>(request1); r->setFirstLevel(2); r->setLastLevel(2); context.addGrouping(r); @@ -340,7 +340,7 @@ TEST_F("testEmptySessionId", DoomFixture()) { .addLevel(createGL(MU<AttributeNode>("attr1"), MU<AttributeNode>("attr2"))) .addLevel(createGL(MU<AttributeNode>("attr2"), MU<AttributeNode>("attr3"))); - GroupingContext::GroupingPtr r1(new Grouping(request1)); + auto r1 = std::make_shared<Grouping>(request1); GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom); initContext.addGrouping(r1); SessionId id; @@ -373,7 +373,7 @@ TEST_F("testSessionManager", DoomFixture()) { .setExpression(MU<AttributeNode>("attr0")) .setResult(Int64ResultNode(0)))); - GroupingContext::GroupingPtr r1(new Grouping(request1)); + auto r1 = std::make_shared<Grouping>(request1); GroupingContext initContext(f1.clock.clock(), f1.timeOfDoom); initContext.addGrouping(r1); @@ -381,9 +381,9 @@ TEST_F("testSessionManager", DoomFixture()) { SessionId id1("foo"); SessionId id2("bar"); SessionId id3("baz"); - GroupingSession::UP s1(new GroupingSession(id1, initContext, world.attributeContext)); - GroupingSession::UP s2(new GroupingSession(id2, initContext, world.attributeContext)); - GroupingSession::UP s3(new GroupingSession(id3, initContext, world.attributeContext)); + auto s1 = std::make_unique<GroupingSession>(id1, initContext, world.attributeContext); + auto s2 = std::make_unique<GroupingSession>(id2, initContext, world.attributeContext); + auto s3 = std::make_unique<GroupingSession>(id3, initContext, world.attributeContext); ASSERT_EQUAL(f1.timeOfDoom, s1->getTimeOfDoom()); mgr.insert(std::move(s1)); @@ -431,7 +431,7 @@ TEST_F("test grouping fork/join", DoomFixture()) { .setFirstLevel(0) .setLastLevel(1); - GroupingContext::GroupingPtr g1(new Grouping(request)); + auto g1 = std::make_shared<Grouping>(request); GroupingContext context(f1.clock.clock(), f1.timeOfDoom); context.addGrouping(g1); GroupingSession session(SessionId(), context, world.attributeContext); @@ -476,24 +476,20 @@ TEST_F("test session timeout", DoomFixture()) { GroupingContext initContext1(f1.clock.clock(), steady_time(duration(10))); GroupingContext initContext2(f1.clock.clock(), steady_time(duration(20))); - GroupingSession::UP s1(new GroupingSession(id1, initContext1, world.attributeContext)); - GroupingSession::UP s2(new GroupingSession(id2, initContext2, world.attributeContext)); + auto s1 = std::make_unique<GroupingSession>(id1, initContext1, world.attributeContext); + auto s2 = std::make_unique<GroupingSession>(id2, initContext2, world.attributeContext); mgr.insert(std::move(s1)); mgr.insert(std::move(s2)); mgr.pruneTimedOutSessions(steady_time(5ns)); - SessionManager::Stats stats(mgr.getGroupingStats()); - ASSERT_EQUAL(2u, stats.numCached); + ASSERT_EQUAL(2u, mgr.getGroupingStats().numCached); mgr.pruneTimedOutSessions(steady_time(10ns)); - stats = mgr.getGroupingStats(); - ASSERT_EQUAL(2u, stats.numCached); + ASSERT_EQUAL(2u, mgr.getGroupingStats().numCached); mgr.pruneTimedOutSessions(steady_time(11ns)); - stats = mgr.getGroupingStats(); - ASSERT_EQUAL(1u, stats.numCached); + ASSERT_EQUAL(1u, mgr.getGroupingStats().numCached); mgr.pruneTimedOutSessions(steady_time(21ns)); - stats = mgr.getGroupingStats(); - ASSERT_EQUAL(0u, stats.numCached); + ASSERT_EQUAL(0u, mgr.getGroupingStats().numCached); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp index 1e74d83185f..b78d638d702 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.cpp @@ -3,6 +3,8 @@ #include "sessionmanager.h" #include <vespa/vespalib/stllike/lrucache_map.hpp> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/foreground_thread_executor.h> #include <mutex> #include <algorithm> @@ -50,10 +52,6 @@ struct SessionCache : SessionCacheBase { } return ret; } - void pruneTimedOutSessions(vespalib::steady_time currentTime) { - std::vector<EntryUP> toDestruct = stealTimedOutSessions(currentTime); - toDestruct.clear(); - } std::vector<EntryUP> stealTimedOutSessions(vespalib::steady_time currentTime) { std::vector<EntryUP> toDestruct; std::lock_guard<std::mutex> guard(_lock); @@ -103,10 +101,6 @@ struct SessionMap : SessionCacheBase { } return EntrySP(); } - void pruneTimedOutSessions(vespalib::steady_time currentTime) { - std::vector<EntrySP> toDestruct = stealTimedOutSessions(currentTime); - toDestruct.clear(); - } std::vector<EntrySP> stealTimedOutSessions(vespalib::steady_time currentTime) { std::vector<EntrySP> toDestruct; std::vector<SessionId> keys; @@ -151,7 +145,8 @@ struct SessionMap : SessionCacheBase { } }; -void SessionCacheBase::entryDropped(const SessionId &id) { +void +SessionCacheBase::entryDropped(const SessionId &id) { LOG(debug, "Session cache is full, dropping entry to fit session '%s'", id.c_str()); _stats.numDropped++; } @@ -179,19 +174,23 @@ SessionManager::~SessionManager() { assert(_search_map->empty()); } -void SessionManager::insert(search::grouping::GroupingSession::UP session) { +void +SessionManager::insert(search::grouping::GroupingSession::UP session) { _grouping_cache->insert(std::move(session)); } -void SessionManager::insert(SearchSession::SP session) { +void +SessionManager::insert(SearchSession::SP session) { _search_map->insert(std::move(session)); } -GroupingSession::UP SessionManager::pickGrouping(const SessionId &id) { +GroupingSession::UP +SessionManager::pickGrouping(const SessionId &id) { return _grouping_cache->pick(id); } -SearchSession::SP SessionManager::pickSearch(const SessionId &id) { +SearchSession::SP +SessionManager::pickSearch(const SessionId &id) { return _search_map->pick(id); } @@ -199,33 +198,56 @@ std::vector<SessionManager::SearchSessionInfo> SessionManager::getSortedSearchSessionInfo() const { std::vector<SearchSessionInfo> sessions; - _search_map->each([&sessions](const SearchSession &session) - { - sessions.emplace_back(session.getSessionId(), - session.getCreateTime(), - session.getTimeOfDoom()); - }); + _search_map->each([&sessions](const SearchSession &session) { + sessions.emplace_back(session.getSessionId(), session.getCreateTime(), session.getTimeOfDoom()); + }); std::sort(sessions.begin(), sessions.end(), - [](const SearchSessionInfo &a, - const SearchSessionInfo &b) - { + [](const SearchSessionInfo &a, const SearchSessionInfo &b) { return (a.created < b.created); }); return sessions; } -void SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) { - _grouping_cache->pruneTimedOutSessions(currentTime); - _search_map->pruneTimedOutSessions(currentTime); +void +SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime) { + vespalib::ForegroundThreadExecutor executor; + pruneTimedOutSessions(currentTime, executor); +} + +namespace { + +template <typename T> +void +split_and_execute(std::vector<T> tasks, vespalib::ThreadExecutor & executor) { + size_t num_bundles = std::max(1ul, std::min(tasks.size(), 2*executor.getNumThreads())); + std::vector<std::vector<T>> bundles(num_bundles); + for (size_t i = 0; i < tasks.size(); i++) { + bundles[i%bundles.size()].push_back(std::move(tasks[i])); + } + for (size_t i = 0; i < bundles.size(); i++) { + executor.execute(vespalib::makeLambdaTask([part=std::move(bundles[i])]() { + // Objects will be destructed in the given executor; + })); + } +} + +} +void +SessionManager::pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor) { + split_and_execute(_grouping_cache->stealTimedOutSessions(currentTime), executor); + split_and_execute(_search_map->stealTimedOutSessions(currentTime), executor); } -SessionManager::Stats SessionManager::getGroupingStats() { +SessionManager::Stats +SessionManager::getGroupingStats() { return _grouping_cache->getStats(); } -SessionManager::Stats SessionManager::getSearchStats() { +SessionManager::Stats +SessionManager::getSearchStats() { return _search_map->getStats(); } -size_t SessionManager::getNumSearchSessions() const { +size_t +SessionManager::getNumSearchSessions() const { return _search_map->size(); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h index 3ed4760b52e..872f1a90bd1 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h +++ b/searchcore/src/vespa/searchcore/proton/matching/sessionmanager.h @@ -6,6 +6,7 @@ #include <vespa/searchcore/grouping/sessionid.h> #include <vespa/vespalib/stllike/lrucache_map.h> +namespace vespalib { class ThreadExecutor; } namespace proton::matching { using SessionId = vespalib::string; @@ -58,6 +59,8 @@ public: size_t getNumSearchSessions() const; std::vector<SearchSessionInfo> getSortedSearchSessionInfo() const; + void pruneTimedOutSessions(vespalib::steady_time currentTime, vespalib::ThreadExecutor & executor); + // Only used for testing void pruneTimedOutSessions(vespalib::steady_time currentTime); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 7f3dc02aba8..d70bff52ed4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -380,7 +380,9 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _flushEngine->start(); vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval); - _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval); + _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { + _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now(), _shared_service->shared()); + }), pruneSessionsInterval, pruneSessionsInterval); _isInitializing = false; _protonConfigurer.setAllowReconfig(true); _initComplete = true; |