summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-09-20 21:20:01 +0200
committerGitHub <noreply@github.com>2023-09-20 21:20:01 +0200
commit18bb86ef0aab3a9d5f43af1227731a93818b89ab (patch)
treef13c72019273aaa2d3a0ac0ae7c016451478a8dc
parent4e04bf3134a057983d6144aa8875aec169d57ba2 (diff)
parent27e95a666bb5f3066fd23b52e3b21df48dff164e (diff)
Merge pull request #28575 from vespa-engine/balder/softdoom-early
Balder/softdoom early
-rw-r--r--searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp97
-rw-r--r--searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/matcher.cpp4
-rw-r--r--vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp35
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp10
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.h29
7 files changed, 102 insertions, 75 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
index 9db7eef840d..25b16909a42 100644
--- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
@@ -12,18 +12,22 @@
LOG_SETUP(".proton.matchengine.matchengine");
+using search::engine::SearchRequest;
+using search::engine::SearchReply;
+using search::engine::SearchClient;
+
+using namespace search::fef::indexproperties;
+
namespace {
class SearchTask : public vespalib::Executor::Task {
private:
- proton::MatchEngine &_engine;
- search::engine::SearchRequest::Source _request;
- search::engine::SearchClient &_client;
+ proton::MatchEngine &_engine;
+ SearchRequest::Source _request;
+ SearchClient &_client;
public:
- SearchTask(proton::MatchEngine &engine,
- search::engine::SearchRequest::Source request,
- search::engine::SearchClient &client)
+ SearchTask(proton::MatchEngine &engine, SearchRequest::Source request, SearchClient &client)
: _engine(engine),
_request(std::move(request)),
_client(client)
@@ -100,13 +104,12 @@ MatchEngine::removeSearchHandler(const DocTypeName &docTypeName)
return _handlers.removeHandler(docTypeName);
}
-search::engine::SearchReply::UP
-MatchEngine::search(search::engine::SearchRequest::Source request,
- search::engine::SearchClient &client)
+SearchReply::UP
+MatchEngine::search(SearchRequest::Source request, SearchClient &client)
{
// We continue to allow searches if the node is in Maintenance mode
if (_closed || (!_nodeUp && !_nodeMaintenance.load(std::memory_order_relaxed))) {
- auto ret = std::make_unique<search::engine::SearchReply>();
+ auto ret = std::make_unique<SearchReply>();
ret->setDistributionKey(_distributionKey);
// TODO: Notify closed.
@@ -120,43 +123,51 @@ MatchEngine::search(search::engine::SearchRequest::Source request,
return performSearch(std::move(request));
}
-std::unique_ptr<search::engine::SearchReply>
-MatchEngine::performSearch(search::engine::SearchRequest::Source req)
+std::unique_ptr<SearchReply>
+MatchEngine::doSearch(const SearchRequest & searchRequest) {
+ if (searchRequest.expired()) {
+ vespalib::Issue::report("Query timed out in the query Q.");
+ return std::make_unique<SearchReply>();
+ }
+ // 3 is the minimum level required for backend tracing.
+ searchRequest.setTraceLevel(trace::Level::lookup(searchRequest.propertiesMap.modelOverrides(),
+ searchRequest.trace().getLevel()), 3);
+ ISearchHandler::SP searchHandler;
+ auto threadBundle = _threadBundlePool.getBundle();
+ { // try to find the match handler corresponding to the specified search doc type
+ DocTypeName docTypeName(searchRequest);
+ std::lock_guard<std::mutex> guard(_lock);
+ searchHandler = _handlers.getHandler(docTypeName);
+ }
+ std::unique_ptr<SearchReply> ret;
+ if (searchHandler) {
+ ret = searchHandler->match(searchRequest, threadBundle.bundle());
+ } else {
+ HandlerMap<ISearchHandler>::Snapshot snapshot;
+ {
+ std::lock_guard<std::mutex> guard(_lock);
+ snapshot = _handlers.snapshot();
+ }
+ ret = (snapshot.valid())
+ ? snapshot.get()->match(searchRequest, threadBundle.bundle()) // use the first handler
+ : std::make_unique<SearchReply>();
+ }
+ if (searchRequest.expired()) {
+ vespalib::Issue::report("search request timed out; results may be incomplete");
+ }
+ return ret;
+}
+
+std::unique_ptr<SearchReply>
+MatchEngine::performSearch(SearchRequest::Source req)
{
auto my_issues = std::make_unique<search::UniqueIssues>();
auto capture_issues = vespalib::Issue::listen(*my_issues);
- auto ret = std::make_unique<search::engine::SearchReply>();
-
- const search::engine::SearchRequest * searchRequest = req.get();
- if (searchRequest) {
- // 3 is the minimum level required for backend tracing.
- searchRequest->setTraceLevel(search::fef::indexproperties::trace::Level::lookup(searchRequest->propertiesMap.modelOverrides(),
- searchRequest->trace().getLevel()), 3);
- ISearchHandler::SP searchHandler;
- vespalib::SimpleThreadBundle::UP threadBundle = _threadBundlePool.obtain();
- { // try to find the match handler corresponding to the specified search doc type
- DocTypeName docTypeName(*searchRequest);
- std::lock_guard<std::mutex> guard(_lock);
- searchHandler = _handlers.getHandler(docTypeName);
- }
- if (searchHandler) {
- ret = searchHandler->match(*searchRequest, *threadBundle);
- } else {
- HandlerMap<ISearchHandler>::Snapshot snapshot;
- {
- std::lock_guard<std::mutex> guard(_lock);
- snapshot = _handlers.snapshot();
- }
- if (snapshot.valid()) {
- ret = snapshot.get()->match(*searchRequest, *threadBundle); // use the first handler
- }
- }
- _threadBundlePool.release(std::move(threadBundle));
- if (searchRequest->expired()) {
- vespalib::Issue::report("search request timed out; results may be incomplete");
- }
- }
+ const SearchRequest * searchRequest = req.get();
+ auto ret = (searchRequest)
+ ? doSearch(*searchRequest)
+ : std::make_unique<SearchReply>();
ret->request = req.release();
if (_forward_issues) {
ret->my_issues = std::move(my_issues);
diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h
index 2979213979f..52ca419c50c 100644
--- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h
+++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h
@@ -28,6 +28,7 @@ private:
std::atomic<bool> _nodeUp;
std::atomic<bool> _nodeMaintenance;
+ std::unique_ptr<search::engine::SearchReply> doSearch(const search::engine::SearchRequest & searchRequest);
public:
/**
* Convenience typedefs.
diff --git a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp
index a353d4816f6..f62f4c60a6c 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/match_tools.cpp
@@ -187,6 +187,7 @@ MatchToolsFactory(QueryLimiter & queryLimiter,
_diversityParams(),
_valid(false)
{
+ if (doom.soft_doom()) return;
auto trace = root_trace.make_trace();
trace.addEvent(4, "Start query setup");
_query.setWhiteListBlueprint(metaStore.createWhiteListBlueprint());
diff --git a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp
index 4a4a021d6d5..eef0eb48738 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/matcher.cpp
@@ -278,6 +278,10 @@ Matcher::match(const SearchRequest &request, vespalib::ThreadBundle &threadBundl
if (!mtf->valid()) {
return reply;
}
+ if (mtf->get_request_context().getDoom().soft_doom()) {
+ vespalib::Issue::report("Search request soft doomed during query setup and initialization.");
+ return reply;
+ }
const Properties & rankProperties = request.propertiesMap.rankProperties();
uint32_t heapSize = HeapSize::lookup(rankProperties, _rankSetup->getHeapSize());
diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
index e451f1e033d..b155bb02d42 100644
--- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
+++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
@@ -160,15 +160,11 @@ TEST("require that all strategies work with variable number of threads and targe
}
TEST_F("require that bundle pool gives out bundles", SimpleThreadBundle::Pool(5)) {
- SimpleThreadBundle::UP b1 = f1.obtain();
- SimpleThreadBundle::UP b2 = f1.obtain();
- ASSERT_TRUE(b1.get() != 0);
- ASSERT_TRUE(b2.get() != 0);
- EXPECT_EQUAL(5u, b1->size());
- EXPECT_EQUAL(5u, b2->size());
- EXPECT_FALSE(b1.get() == b2.get());
- f1.release(std::move(b1));
- f1.release(std::move(b2));
+ auto b1 = f1.getBundle();
+ auto b2 = f1.getBundle();
+ EXPECT_EQUAL(5u, b1.bundle().size());
+ EXPECT_EQUAL(5u, b2.bundle().size());
+ EXPECT_FALSE(&b1.bundle() == &b2.bundle());
}
TEST_F("require that bundles do not need to be put back on the pool", SimpleThreadBundle::Pool(5)) {
@@ -178,20 +174,20 @@ TEST_F("require that bundles do not need to be put back on the pool", SimpleThre
}
TEST_F("require that bundle pool reuses bundles", SimpleThreadBundle::Pool(5)) {
- SimpleThreadBundle::UP bundle = f1.obtain();
- SimpleThreadBundle *ptr = bundle.get();
- f1.release(std::move(bundle));
- bundle = f1.obtain();
- EXPECT_EQUAL(ptr, bundle.get());
+ SimpleThreadBundle *ptr;
+ {
+ ptr = &f1.getBundle().bundle();
+ }
+ auto bundle = f1.getBundle();
+ EXPECT_EQUAL(ptr, &bundle.bundle());
}
TEST_MT_FF("require that bundle pool works with multiple threads", 32, SimpleThreadBundle::Pool(3),
std::vector<SimpleThreadBundle*>(num_threads, 0))
{
- SimpleThreadBundle::UP bundle = f1.obtain();
- ASSERT_TRUE(bundle.get() != 0);
- EXPECT_EQUAL(3u, bundle->size());
- f2[thread_id] = bundle.get();
+ SimpleThreadBundle::Pool::Guard bundle = f1.getBundle();
+ EXPECT_EQUAL(3u, bundle.bundle().size());
+ f2[thread_id] = &bundle.bundle();
TEST_BARRIER();
if (thread_id == 0) {
for (size_t i = 0; i < num_threads; ++i) {
@@ -201,13 +197,12 @@ TEST_MT_FF("require that bundle pool works with multiple threads", 32, SimpleThr
}
}
TEST_BARRIER();
- f1.release(std::move(bundle));
}
struct Filler {
int stuff;
Filler() : stuff(0) {}
- virtual ~Filler() {}
+ virtual ~Filler() = default;
};
struct Proxy : Filler, Runnable {
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index 958bb58f34a..4a588709c60 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -87,7 +87,7 @@ SimpleThreadBundle::Pool::obtain()
return ret;
}
}
- return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun);
+ return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun, USE_SIGNAL_LIST);
}
void
@@ -142,11 +142,11 @@ SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Runnable::init_fun_t init
SimpleThreadBundle::~SimpleThreadBundle()
{
- for (size_t i = 0; i < _signals.size(); ++i) {
- _signals[i].cancel();
+ for (auto & _signal : _signals) {
+ _signal.cancel();
}
- for (size_t i = 0; i < _workers.size(); ++i) {
- _workers[i]->thread.join();
+ for (const auto & _worker : _workers) {
+ _worker->thread.join();
}
}
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
index 536474d9300..5dde0575a74 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
@@ -21,7 +21,7 @@ struct Work {
Runnable* const* targets;
size_t cnt;
CountDownLatch *latch;
- Work() : targets(nullptr), cnt(0), latch(0) {}
+ Work() : targets(nullptr), cnt(0), latch(nullptr) {}
};
/**
@@ -31,7 +31,7 @@ struct Part {
const Work &work;
size_t offset;
Part(const Work &w, size_t o) : work(w), offset(o) {}
- bool valid() { return (offset < work.cnt); }
+ bool valid() const noexcept { return (offset < work.cnt); }
void perform() {
if (valid()) {
work.targets[offset]->run();
@@ -51,7 +51,7 @@ struct Signal {
Signal() noexcept;
Signal(Signal &&) noexcept = default;
~Signal();
- size_t wait(size_t &localGen) {
+ size_t wait(size_t &localGen) const {
std::unique_lock guard(*monitor);
while (localGen == generation) {
cond->wait(guard);
@@ -103,9 +103,22 @@ public:
std::vector<SimpleThreadBundle*> _bundles;
public:
+ class Guard {
+ public:
+ explicit Guard(Pool & pool) : _bundle(pool.obtain()), _pool(pool) {}
+ Guard(const Guard &) = delete;
+ Guard & operator =(const Guard &) = delete;
+ ~Guard() { _pool.release(std::move(_bundle)); }
+ SimpleThreadBundle & bundle() { return *_bundle; }
+ private:
+ SimpleThreadBundle::UP _bundle;
+ Pool &_pool;
+ };
Pool(size_t bundleSize, init_fun_t init_fun);
- Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {}
+ explicit Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {}
~Pool();
+ Guard getBundle() { return Guard(*this); }
+ //TODO Make private
SimpleThreadBundle::UP obtain();
void release(SimpleThreadBundle::UP bundle);
};
@@ -126,10 +139,12 @@ private:
Runnable::UP _hook;
public:
- SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy = USE_SIGNAL_LIST);
- SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST)
+ SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy);
+ SimpleThreadBundle(size_t size, Strategy strategy)
: SimpleThreadBundle(size, Runnable::default_init_function, strategy) {}
- ~SimpleThreadBundle();
+ explicit SimpleThreadBundle(size_t size)
+ : SimpleThreadBundle(size, USE_SIGNAL_LIST) {}
+ ~SimpleThreadBundle() override;
size_t size() const override;
using ThreadBundle::run;
void run(Runnable* const* targets, size_t cnt) override;