diff options
5 files changed, 105 insertions, 60 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java index df87de2a12b..f0303615e3d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java @@ -57,6 +57,11 @@ public final class DocsumDefinitionSet { return ds; } + /** Do we have a summary definition with the given name */ + public boolean hasDocsum(String summaryClass) { + return definitionsByName.containsKey(summaryClass); + } + /** * Makes data available for decoding for the given hit. * diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java index fe74180cad3..7a5ef94069d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -65,6 +65,11 @@ public class RpcProtobufFillInvoker extends FillInvoker { @Override protected void sendFillRequest(Result result, String summaryClass) { + if (! documentDb.getDocsumDefinitionSet().hasDocsum(summaryClass)) { + // TODO Vespa 8: + // throw new IllegalArgumentException("invalid summary="+summaryClass); + log.fine("invalid presentation.summary="+summaryClass); + } ListMap<Integer, FastHit> hitsByNode = hitsByNode(result); result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf"); diff --git a/metrics/src/tests/metricmanagertest.cpp b/metrics/src/tests/metricmanagertest.cpp index 82d8521d6db..cfc2e722732 100644 --- a/metrics/src/tests/metricmanagertest.cpp +++ b/metrics/src/tests/metricmanagertest.cpp @@ -13,11 +13,15 @@ #include <vespa/vespalib/util/xmlstream.h> #include <vespa/vespalib/util/time.h> #include <vespa/vespalib/data/simple_buffer.h> +#include <vespa/vespalib/util/atomic.h> +#include <mutex> #include <thread> #include <vespa/log/log.h> LOG_SETUP(".test.metricmanager"); +using namespace vespalib::atomic; + namespace metrics { struct MetricManagerTest : public ::testing::Test { @@ -363,10 +367,14 @@ TEST_F(MetricManagerTest, test_consumer_visitor) namespace { -struct FakeTimer : public MetricManager::Timer { - time_t _time; +class FakeTimer : public MetricManager::Timer { + std::atomic<time_t> _time; +public: FakeTimer(time_t startTime = 0) : _time(startTime) {} - time_t getTime() const override { return _time; } + time_t getTime() const override { return load_relaxed(_time); } + void set_time(time_t t) noexcept { store_relaxed(_time, t); } + // Not safe for multiple writers, only expected to be called by test. + void add_time(time_t t) noexcept { set_time(getTime() + t); } }; struct BriefValuePrinter : public MetricVisitor { @@ -516,7 +524,7 @@ TEST_F(MetricManagerTest, test_snapshots) mySet.val10.a.val1.addValue(7); mySet.val10.a.val2.addValue(2); mySet.val10.b.val1.addValue(1); - timer->_time += 5 * 60; + timer->add_time(5 * 60); ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60); ASSERT_VALUES(mm, 5 * 60, "2,4,4,1,7,9,1,1,8,2,10"); ASSERT_VALUES(mm, 60 * 60, ""); @@ -530,17 +538,15 @@ TEST_F(MetricManagerTest, test_snapshots) mySet.val10.a.val1.addValue(8); mySet.val10.a.val2.addValue(3); mySet.val10.b.val1.addValue(2); - timer->_time += 5 * 60; + timer->add_time(5 * 60); ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60 * 2); ASSERT_VALUES(mm, 5 * 60, "4,5,5,1,8,11,2,2,10,3,13"); ASSERT_VALUES(mm, 60 * 60, ""); ASSERT_VALUES(mm, 0 * 60, "4,5,5,2,8,11,2,2,10,3,13"); - //std::cerr << dumpAllSnapshots(mm, "snapper") << "\n"; - // Adding another five minute period where nothing have happened. // Metric for last 5 minutes should be 0. - timer->_time += 5 * 60; + timer->add_time(5 * 60); ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60 * 3); ASSERT_VALUES(mm, 5 * 60, "0,0,0,0,0,0,0,0,0,0,0"); ASSERT_VALUES(mm, 60 * 60, ""); @@ -551,7 +557,7 @@ TEST_F(MetricManagerTest, test_snapshots) mySet.val6.addValue(6); for (uint32_t i=0; i<9; ++i) { // 9 x 5 minutes. Avoid snapshot bumping // due to taking snapshots in the past - timer->_time += 5 * 60; + timer->add_time(5 * 60); ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60 * (4 + i)); } ASSERT_VALUES(mm, 5 * 60, "0,0,0,0,0,0,0,0,0,0,0"); @@ -599,7 +605,7 @@ TEST_F(MetricManagerTest, test_xml_output) mySet.val10.a.val2.addValue(2); mySet.val10.b.val1.addValue(1); - timer->_time = 1300; + timer->set_time(1300); takeSnapshots(mm, 1300); std::string expected( @@ -674,7 +680,7 @@ TEST_F(MetricManagerTest, test_json_output) mySet.val10.a.val2.addValue(2); mySet.val10.b.val1.addValue(1); - timer->_time = 1300; + timer->set_time(1300); takeSnapshots(mm, 1300); // Create json output @@ -769,7 +775,7 @@ struct MetricSnapshotTestFixture // Take snapshot of metric values from time 1000 to time 1300 void takeSnapshotsOnce() { - timer->_time = 1300; + timer->set_time(1300); test.takeSnapshots(manager, 1300); } @@ -1049,13 +1055,22 @@ TEST_F(MetricManagerTest, text_output_supports_dimensions) namespace { struct MyUpdateHook : public UpdateHook { std::ostringstream& _output; - FakeTimer& _timer; + std::mutex& _output_mutex; + FakeTimer& _timer; - MyUpdateHook(std::ostringstream& output, const char* name, + MyUpdateHook(std::ostringstream& output, + std::mutex& output_mutex, + const char* name, FakeTimer& timer) - : UpdateHook(name), _output(output), _timer(timer) {} + : UpdateHook(name), + _output(output), + _output_mutex(output_mutex), + _timer(timer) + {} + ~MyUpdateHook() override = default; void updateMetrics(const MetricLockGuard & ) override { + std::lock_guard lock(_output_mutex); // updateMetrics() called from metric manager thread _output << _timer.getTime() << ": " << getName() << " called\n"; } }; @@ -1063,6 +1078,7 @@ namespace { TEST_F(MetricManagerTest, test_update_hooks) { + std::mutex output_mutex; std::ostringstream output; FastOS_ThreadPool pool(256_Ki); FakeTimer* timer = new FakeTimer(1000); @@ -1075,9 +1091,9 @@ TEST_F(MetricManagerTest, test_update_hooks) mm.registerMetric(lockGuard, mySet.set); } - MyUpdateHook preInitShort(output, "BIS", *timer); - MyUpdateHook preInitLong(output, "BIL", *timer); - MyUpdateHook preInitInfinite(output, "BII", *timer); + MyUpdateHook preInitShort(output, output_mutex, "BIS", *timer); + MyUpdateHook preInitLong(output, output_mutex, "BIL", *timer); + MyUpdateHook preInitInfinite(output, output_mutex, "BII", *timer); mm.addMetricUpdateHook(preInitShort, 5); mm.addMetricUpdateHook(preInitLong, 300); mm.addMetricUpdateHook(preInitInfinite, 0); @@ -1097,55 +1113,55 @@ TEST_F(MetricManagerTest, test_update_hooks) pool); output << "Init done\n"; - MyUpdateHook postInitShort(output, "AIS", *timer); - MyUpdateHook postInitLong(output, "AIL", *timer); - MyUpdateHook postInitInfinite(output, "AII", *timer); + MyUpdateHook postInitShort(output, output_mutex, "AIS", *timer); + MyUpdateHook postInitLong(output, output_mutex, "AIL", *timer); + MyUpdateHook postInitInfinite(output, output_mutex, "AII", *timer); mm.addMetricUpdateHook(postInitShort, 5); mm.addMetricUpdateHook(postInitLong, 400); mm.addMetricUpdateHook(postInitInfinite, 0); // After 5 seconds the short ones should get another. - timer->_time = 1006; + timer->set_time(1006); waitForTimeProcessed(mm, 1006); // After 4 more seconds the short ones should get another // since last update was a second late. (Stable periods, process time // should not affect how often they are updated) - timer->_time = 1010; + timer->set_time(1010); waitForTimeProcessed(mm, 1010); // Bumping considerably ahead, such that next update is in the past, // we should only get one update called in this period. - timer->_time = 1200; + timer->set_time(1200); waitForTimeProcessed(mm, 1200); // No updates at this time. - timer->_time = 1204; + timer->set_time(1204); waitForTimeProcessed(mm, 1204); // Give all hooks an update mm.updateMetrics(true); // Last update should not have interfered with periods - timer->_time = 1205; + timer->set_time(1205); waitForTimeProcessed(mm, 1205); // Time is just ahead of a snapshot. - timer->_time = 1299; + timer->set_time(1299); waitForTimeProcessed(mm, 1299); // At time 1300 we are at a 5 minute snapshot bump // All hooks should thus get an update. The one with matching period // should only get one - timer->_time = 1300; + timer->set_time(1300); waitForTimeProcessed(mm, 1300); // The snapshot time currently doesn't count for the metric at period // 400. It will get an event at this time. - timer->_time = 1450; + timer->set_time(1450); waitForTimeProcessed(mm, 1450); std::string expected( @@ -1179,8 +1195,11 @@ TEST_F(MetricManagerTest, test_update_hooks) "1450: AIS called\n" "1450: AIL called\n" ); - std::string actual(output.str()); - EXPECT_EQ(expected, actual); + { + std::lock_guard lock(output_mutex); // Need to ensure we observe all writes by metric mgr thread + std::string actual(output.str()); + EXPECT_EQ(expected, actual); + } } } diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp index 54d90a0f310..d7534514f41 100644 --- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp @@ -15,28 +15,47 @@ Runnable::Runnable() Runnable::~Runnable() { std::lock_guard monitorGuard(_stateLock); - assert(_state == NOT_RUNNING); + assert(getState() == NOT_RUNNING); } bool Runnable::start(FastOS_ThreadPool& pool) { std::unique_lock guard(_stateLock); - _stateCond.wait(guard, [&](){ return (_state != STOPPING);}); + _stateCond.wait(guard, [&](){ return (getState() != STOPPING);}); - if (_state != NOT_RUNNING) return false; - _state = STARTING; + if (getState() != NOT_RUNNING) return false; + set_state(STARTING); if (pool.NewThread(this) == nullptr) { throw vespalib::IllegalStateException("Failed starting a new thread", VESPA_STRLOC); } return true; } +void Runnable::set_state(State new_state) noexcept +{ + _state.store(new_state, std::memory_order_relaxed); +} + +bool Runnable::stopping() const noexcept +{ + State s(getState()); + return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag()); +} + +bool Runnable::running() const noexcept +{ + State s(getState()); + // Must check break-flag too, as threadpool will use that to close + // down. + return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag())); +} + bool Runnable::stop() { std::lock_guard monitor(_stateLock); - if (_state == STOPPING || _state == NOT_RUNNING) return false; + if (getState() == STOPPING || getState() == NOT_RUNNING) return false; GetThread()->SetBreakFlag(); - _state = STOPPING; + set_state(STOPPING); return onStop(); } @@ -48,8 +67,8 @@ bool Runnable::onStop() bool Runnable::join() const { std::unique_lock guard(_stateLock); - assert ((_state != STARTING) && (_state != RUNNING)); - _stateCond.wait(guard, [&](){ return (_state == NOT_RUNNING);}); + assert ((getState() != STARTING) && (getState() != RUNNING)); + _stateCond.wait(guard, [&](){ return (getState() == NOT_RUNNING);}); return true; } @@ -57,21 +76,21 @@ void Runnable::Run(FastOS_ThreadInterface*, void*) { { std::lock_guard guard(_stateLock); - // Dont set state if its alreadyt at stopping. (And let run() be + // Don't set state if its already at stopping. (And let run() be // called even though about to stop for consistency) - if (_state == STARTING) { - _state = RUNNING; + if (getState() == STARTING) { + set_state(RUNNING); } } // By not catching exceptions, they should abort whole application. - // We should thus not need to have a catch all to set state to not + // We should thus not need to have a catch-all to set state to not // running. run(); { std::lock_guard guard(_stateLock); - _state = NOT_RUNNING; + set_state(NOT_RUNNING); _stateCond.notify_all(); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h index cf2befcc8d5..5ca344ea7ef 100644 --- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h +++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h @@ -19,6 +19,7 @@ #pragma once #include <vespa/fastos/thread.h> +#include <atomic> namespace document { @@ -29,16 +30,17 @@ public: private: mutable std::mutex _stateLock; mutable std::condition_variable _stateCond; - State _state; + std::atomic<State> _state; void Run(FastOS_ThreadInterface*, void*) override; + void set_state(State new_state) noexcept; // _stateLock must be held public: /** * Create a runnable. * @param pool If set, runnable will be started in constructor. */ Runnable(); - ~Runnable(); + ~Runnable() override; /** * Start this runnable. @@ -71,26 +73,21 @@ public: */ virtual void run() = 0; - /** Get the current state of this runnable. */ - State getState() const { return _state; } + /** + * Get the current state of this runnable. + * Thread safe (but relaxed) read; may be stale if done outside _stateLock. + */ + [[nodiscard]] State getState() const noexcept { + return _state.load(std::memory_order_relaxed); + } /** Check if system is in the process of stopping. */ - bool stopping() const - { - State s(getState()); - return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag()); - } + [[nodiscard]] bool stopping() const noexcept; /** * Checks if runnable is running or not. (Started is considered running) */ - bool running() const - { - State s(getState()); - // Must check breakflag too, as threadpool will use that to close - // down. - return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag())); - } + [[nodiscard]] bool running() const noexcept; }; } |