diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-09-20 21:20:01 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-20 21:20:01 +0200 |
commit | 18bb86ef0aab3a9d5f43af1227731a93818b89ab (patch) | |
tree | f13c72019273aaa2d3a0ac0ae7c016451478a8dc | |
parent | 4e04bf3134a057983d6144aa8875aec169d57ba2 (diff) | |
parent | 27e95a666bb5f3066fd23b52e3b21df48dff164e (diff) |
Merge pull request #28575 from vespa-engine/balder/softdoom-early
Balder/softdoom early
7 files changed, 102 insertions, 75 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp index 9db7eef840d..25b16909a42 100644 --- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp @@ -12,18 +12,22 @@ LOG_SETUP(".proton.matchengine.matchengine"); +using search::engine::SearchRequest; +using search::engine::SearchReply; +using search::engine::SearchClient; + +using namespace search::fef::indexproperties; + namespace { class SearchTask : public vespalib::Executor::Task { private: - proton::MatchEngine &_engine; - search::engine::SearchRequest::Source _request; - search::engine::SearchClient &_client; + proton::MatchEngine &_engine; + SearchRequest::Source _request; + SearchClient &_client; public: - SearchTask(proton::MatchEngine &engine, - search::engine::SearchRequest::Source request, - search::engine::SearchClient &client) + SearchTask(proton::MatchEngine &engine, SearchRequest::Source request, SearchClient &client) : _engine(engine), _request(std::move(request)), _client(client) @@ -100,13 +104,12 @@ MatchEngine::removeSearchHandler(const DocTypeName &docTypeName) return _handlers.removeHandler(docTypeName); } -search::engine::SearchReply::UP -MatchEngine::search(search::engine::SearchRequest::Source request, - search::engine::SearchClient &client) +SearchReply::UP +MatchEngine::search(SearchRequest::Source request, SearchClient &client) { // We continue to allow searches if the node is in Maintenance mode if (_closed || (!_nodeUp && !_nodeMaintenance.load(std::memory_order_relaxed))) { - auto ret = std::make_unique<search::engine::SearchReply>(); + auto ret = std::make_unique<SearchReply>(); ret->setDistributionKey(_distributionKey); // TODO: Notify closed. @@ -120,43 +123,51 @@ MatchEngine::search(search::engine::SearchRequest::Source request, return performSearch(std::move(request)); } -std::unique_ptr<search::engine::SearchReply> -MatchEngine::performSearch(search::engine::SearchRequest::Source req) +std::unique_ptr<SearchReply> +MatchEngine::doSearch(const SearchRequest & searchRequest) { + if (searchRequest.expired()) { + vespalib::Issue::report("Query timed out in the query Q."); + return std::make_unique<SearchReply>(); + } + // 3 is the minimum level required for backend tracing. + searchRequest.setTraceLevel(trace::Level::lookup(searchRequest.propertiesMap.modelOverrides(), + searchRequest.trace().getLevel()), 3); + ISearchHandler::SP searchHandler; + auto threadBundle = _threadBundlePool.getBundle(); + { // try to find the match handler corresponding to the specified search doc type + DocTypeName docTypeName(searchRequest); + std::lock_guard<std::mutex> guard(_lock); + searchHandler = _handlers.getHandler(docTypeName); + } + std::unique_ptr<SearchReply> ret; + if (searchHandler) { + ret = searchHandler->match(searchRequest, threadBundle.bundle()); + } else { + HandlerMap<ISearchHandler>::Snapshot snapshot; + { + std::lock_guard<std::mutex> guard(_lock); + snapshot = _handlers.snapshot(); + } + ret = (snapshot.valid()) + ? snapshot.get()->match(searchRequest, threadBundle.bundle()) // use the first handler + : std::make_unique<SearchReply>(); + } + if (searchRequest.expired()) { + vespalib::Issue::report("search request timed out; results may be incomplete"); + } + return ret; +} + +std::unique_ptr<SearchReply> +MatchEngine::performSearch(SearchRequest::Source req) { auto my_issues = std::make_unique<search::UniqueIssues>(); auto capture_issues = vespalib::Issue::listen(*my_issues); - auto ret = std::make_unique<search::engine::SearchReply>(); - - const search::engine::SearchRequest * searchRequest = req.get(); - if (searchRequest) { - // 3 is the minimum level required for backend tracing. - searchRequest->setTraceLevel(search::fef::indexproperties::trace::Level::lookup(searchRequest->propertiesMap.modelOverrides(), - searchRequest->trace().getLevel()), 3); - ISearchHandler::SP searchHandler; - vespalib::SimpleThreadBundle::UP threadBundle = _threadBundlePool.obtain(); - { // try to find the match handler corresponding to the specified search doc type - DocTypeName docTypeName(*searchRequest); - std::lock_guard<std::mutex> guard(_lock); - searchHandler = _handlers.getHandler(docTypeName); - } - if (searchHandler) { - ret = searchHandler->match(*searchRequest, *threadBundle); - } else { - HandlerMap<ISearchHandler>::Snapshot snapshot; - { - std::lock_guard<std::mutex> guard(_lock); - snapshot = _handlers.snapshot(); - } - if (snapshot.valid()) { - ret = snapshot.get()->match(*searchRequest, *threadBundle); // use the first handler - } - } - _threadBundlePool.release(std::move(threadBundle)); - if (searchRequest->expired()) { - vespalib::Issue::report("search request timed out; results may be incomplete"); - } - } + const SearchRequest * searchRequest = req.get(); + auto ret = (searchRequest) + ? doSearch(*searchRequest) + : std::make_unique<SearchReply>(); ret->request = req.release(); if (_forward_issues) { ret->my_issues = std::move(my_issues); diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h index 2979213979f..52ca419c50c 100644 --- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h +++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h @@ -28,6 +28,7 @@ private: std::atomic<bool> _nodeUp; std::atomic<bool> _nodeMaintenance; + std::unique_ptr<search::engine::SearchReply> doSearch(const search::engine::SearchRequest & searchRequest); public: /** * Convenience typedefs. diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp index a353d4816f6..f62f4c60a6c 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp @@ -187,6 +187,7 @@ MatchToolsFactory(QueryLimiter & queryLimiter, _diversityParams(), _valid(false) { + if (doom.soft_doom()) return; auto trace = root_trace.make_trace(); trace.addEvent(4, "Start query setup"); _query.setWhiteListBlueprint(metaStore.createWhiteListBlueprint()); diff --git a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp index 4a4a021d6d5..eef0eb48738 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp @@ -278,6 +278,10 @@ Matcher::match(const SearchRequest &request, vespalib::ThreadBundle &threadBundl if (!mtf->valid()) { return reply; } + if (mtf->get_request_context().getDoom().soft_doom()) { + vespalib::Issue::report("Search request soft doomed during query setup and initialization."); + return reply; + } const Properties & rankProperties = request.propertiesMap.rankProperties(); uint32_t heapSize = HeapSize::lookup(rankProperties, _rankSetup->getHeapSize()); diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp index e451f1e033d..b155bb02d42 100644 --- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp +++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp @@ -160,15 +160,11 @@ TEST("require that all strategies work with variable number of threads and targe } TEST_F("require that bundle pool gives out bundles", SimpleThreadBundle::Pool(5)) { - SimpleThreadBundle::UP b1 = f1.obtain(); - SimpleThreadBundle::UP b2 = f1.obtain(); - ASSERT_TRUE(b1.get() != 0); - ASSERT_TRUE(b2.get() != 0); - EXPECT_EQUAL(5u, b1->size()); - EXPECT_EQUAL(5u, b2->size()); - EXPECT_FALSE(b1.get() == b2.get()); - f1.release(std::move(b1)); - f1.release(std::move(b2)); + auto b1 = f1.getBundle(); + auto b2 = f1.getBundle(); + EXPECT_EQUAL(5u, b1.bundle().size()); + EXPECT_EQUAL(5u, b2.bundle().size()); + EXPECT_FALSE(&b1.bundle() == &b2.bundle()); } TEST_F("require that bundles do not need to be put back on the pool", SimpleThreadBundle::Pool(5)) { @@ -178,20 +174,20 @@ TEST_F("require that bundles do not need to be put back on the pool", SimpleThre } TEST_F("require that bundle pool reuses bundles", SimpleThreadBundle::Pool(5)) { - SimpleThreadBundle::UP bundle = f1.obtain(); - SimpleThreadBundle *ptr = bundle.get(); - f1.release(std::move(bundle)); - bundle = f1.obtain(); - EXPECT_EQUAL(ptr, bundle.get()); + SimpleThreadBundle *ptr; + { + ptr = &f1.getBundle().bundle(); + } + auto bundle = f1.getBundle(); + EXPECT_EQUAL(ptr, &bundle.bundle()); } TEST_MT_FF("require that bundle pool works with multiple threads", 32, SimpleThreadBundle::Pool(3), std::vector<SimpleThreadBundle*>(num_threads, 0)) { - SimpleThreadBundle::UP bundle = f1.obtain(); - ASSERT_TRUE(bundle.get() != 0); - EXPECT_EQUAL(3u, bundle->size()); - f2[thread_id] = bundle.get(); + SimpleThreadBundle::Pool::Guard bundle = f1.getBundle(); + EXPECT_EQUAL(3u, bundle.bundle().size()); + f2[thread_id] = &bundle.bundle(); TEST_BARRIER(); if (thread_id == 0) { for (size_t i = 0; i < num_threads; ++i) { @@ -201,13 +197,12 @@ TEST_MT_FF("require that bundle pool works with multiple threads", 32, SimpleThr } } TEST_BARRIER(); - f1.release(std::move(bundle)); } struct Filler { int stuff; Filler() : stuff(0) {} - virtual ~Filler() {} + virtual ~Filler() = default; }; struct Proxy : Filler, Runnable { diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index 958bb58f34a..4a588709c60 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -87,7 +87,7 @@ SimpleThreadBundle::Pool::obtain() return ret; } } - return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun); + return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun, USE_SIGNAL_LIST); } void @@ -142,11 +142,11 @@ SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Runnable::init_fun_t init SimpleThreadBundle::~SimpleThreadBundle() { - for (size_t i = 0; i < _signals.size(); ++i) { - _signals[i].cancel(); + for (auto & _signal : _signals) { + _signal.cancel(); } - for (size_t i = 0; i < _workers.size(); ++i) { - _workers[i]->thread.join(); + for (const auto & _worker : _workers) { + _worker->thread.join(); } } diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h index 536474d9300..5dde0575a74 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h @@ -21,7 +21,7 @@ struct Work { Runnable* const* targets; size_t cnt; CountDownLatch *latch; - Work() : targets(nullptr), cnt(0), latch(0) {} + Work() : targets(nullptr), cnt(0), latch(nullptr) {} }; /** @@ -31,7 +31,7 @@ struct Part { const Work &work; size_t offset; Part(const Work &w, size_t o) : work(w), offset(o) {} - bool valid() { return (offset < work.cnt); } + bool valid() const noexcept { return (offset < work.cnt); } void perform() { if (valid()) { work.targets[offset]->run(); @@ -51,7 +51,7 @@ struct Signal { Signal() noexcept; Signal(Signal &&) noexcept = default; ~Signal(); - size_t wait(size_t &localGen) { + size_t wait(size_t &localGen) const { std::unique_lock guard(*monitor); while (localGen == generation) { cond->wait(guard); @@ -103,9 +103,22 @@ public: std::vector<SimpleThreadBundle*> _bundles; public: + class Guard { + public: + explicit Guard(Pool & pool) : _bundle(pool.obtain()), _pool(pool) {} + Guard(const Guard &) = delete; + Guard & operator =(const Guard &) = delete; + ~Guard() { _pool.release(std::move(_bundle)); } + SimpleThreadBundle & bundle() { return *_bundle; } + private: + SimpleThreadBundle::UP _bundle; + Pool &_pool; + }; Pool(size_t bundleSize, init_fun_t init_fun); - Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {} + explicit Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {} ~Pool(); + Guard getBundle() { return Guard(*this); } + //TODO Make private SimpleThreadBundle::UP obtain(); void release(SimpleThreadBundle::UP bundle); }; @@ -126,10 +139,12 @@ private: Runnable::UP _hook; public: - SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy = USE_SIGNAL_LIST); - SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST) + SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy); + SimpleThreadBundle(size_t size, Strategy strategy) : SimpleThreadBundle(size, Runnable::default_init_function, strategy) {} - ~SimpleThreadBundle(); + explicit SimpleThreadBundle(size_t size) + : SimpleThreadBundle(size, USE_SIGNAL_LIST) {} + ~SimpleThreadBundle() override; size_t size() const override; using ThreadBundle::run; void run(Runnable* const* targets, size_t cnt) override; |