summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java5
-rw-r--r--metrics/src/tests/metricmanagertest.cpp81
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp45
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/document_runnable.h29
5 files changed, 105 insertions, 60 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java
index df87de2a12b..f0303615e3d 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java
@@ -57,6 +57,11 @@ public final class DocsumDefinitionSet {
return ds;
}
+ /** Do we have a summary definition with the given name */
+ public boolean hasDocsum(String summaryClass) {
+ return definitionsByName.containsKey(summaryClass);
+ }
+
/**
* Makes data available for decoding for the given hit.
*
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index fe74180cad3..7a5ef94069d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -65,6 +65,11 @@ public class RpcProtobufFillInvoker extends FillInvoker {
@Override
protected void sendFillRequest(Result result, String summaryClass) {
+ if (! documentDb.getDocsumDefinitionSet().hasDocsum(summaryClass)) {
+ // TODO Vespa 8:
+ // throw new IllegalArgumentException("invalid summary="+summaryClass);
+ log.fine("invalid presentation.summary="+summaryClass);
+ }
ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf");
diff --git a/metrics/src/tests/metricmanagertest.cpp b/metrics/src/tests/metricmanagertest.cpp
index 82d8521d6db..cfc2e722732 100644
--- a/metrics/src/tests/metricmanagertest.cpp
+++ b/metrics/src/tests/metricmanagertest.cpp
@@ -13,11 +13,15 @@
#include <vespa/vespalib/util/xmlstream.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/vespalib/data/simple_buffer.h>
+#include <vespa/vespalib/util/atomic.h>
+#include <mutex>
#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".test.metricmanager");
+using namespace vespalib::atomic;
+
namespace metrics {
struct MetricManagerTest : public ::testing::Test {
@@ -363,10 +367,14 @@ TEST_F(MetricManagerTest, test_consumer_visitor)
namespace {
-struct FakeTimer : public MetricManager::Timer {
- time_t _time;
+class FakeTimer : public MetricManager::Timer {
+ std::atomic<time_t> _time;
+public:
FakeTimer(time_t startTime = 0) : _time(startTime) {}
- time_t getTime() const override { return _time; }
+ time_t getTime() const override { return load_relaxed(_time); }
+ void set_time(time_t t) noexcept { store_relaxed(_time, t); }
+ // Not safe for multiple writers, only expected to be called by test.
+ void add_time(time_t t) noexcept { set_time(getTime() + t); }
};
struct BriefValuePrinter : public MetricVisitor {
@@ -516,7 +524,7 @@ TEST_F(MetricManagerTest, test_snapshots)
mySet.val10.a.val1.addValue(7);
mySet.val10.a.val2.addValue(2);
mySet.val10.b.val1.addValue(1);
- timer->_time += 5 * 60;
+ timer->add_time(5 * 60);
ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60);
ASSERT_VALUES(mm, 5 * 60, "2,4,4,1,7,9,1,1,8,2,10");
ASSERT_VALUES(mm, 60 * 60, "");
@@ -530,17 +538,15 @@ TEST_F(MetricManagerTest, test_snapshots)
mySet.val10.a.val1.addValue(8);
mySet.val10.a.val2.addValue(3);
mySet.val10.b.val1.addValue(2);
- timer->_time += 5 * 60;
+ timer->add_time(5 * 60);
ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60 * 2);
ASSERT_VALUES(mm, 5 * 60, "4,5,5,1,8,11,2,2,10,3,13");
ASSERT_VALUES(mm, 60 * 60, "");
ASSERT_VALUES(mm, 0 * 60, "4,5,5,2,8,11,2,2,10,3,13");
- //std::cerr << dumpAllSnapshots(mm, "snapper") << "\n";
-
// Adding another five minute period where nothing have happened.
// Metric for last 5 minutes should be 0.
- timer->_time += 5 * 60;
+ timer->add_time(5 * 60);
ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60 * 3);
ASSERT_VALUES(mm, 5 * 60, "0,0,0,0,0,0,0,0,0,0,0");
ASSERT_VALUES(mm, 60 * 60, "");
@@ -551,7 +557,7 @@ TEST_F(MetricManagerTest, test_snapshots)
mySet.val6.addValue(6);
for (uint32_t i=0; i<9; ++i) { // 9 x 5 minutes. Avoid snapshot bumping
// due to taking snapshots in the past
- timer->_time += 5 * 60;
+ timer->add_time(5 * 60);
ASSERT_PROCESS_TIME(mm, 1000 + 5 * 60 * (4 + i));
}
ASSERT_VALUES(mm, 5 * 60, "0,0,0,0,0,0,0,0,0,0,0");
@@ -599,7 +605,7 @@ TEST_F(MetricManagerTest, test_xml_output)
mySet.val10.a.val2.addValue(2);
mySet.val10.b.val1.addValue(1);
- timer->_time = 1300;
+ timer->set_time(1300);
takeSnapshots(mm, 1300);
std::string expected(
@@ -674,7 +680,7 @@ TEST_F(MetricManagerTest, test_json_output)
mySet.val10.a.val2.addValue(2);
mySet.val10.b.val1.addValue(1);
- timer->_time = 1300;
+ timer->set_time(1300);
takeSnapshots(mm, 1300);
// Create json output
@@ -769,7 +775,7 @@ struct MetricSnapshotTestFixture
// Take snapshot of metric values from time 1000 to time 1300
void takeSnapshotsOnce() {
- timer->_time = 1300;
+ timer->set_time(1300);
test.takeSnapshots(manager, 1300);
}
@@ -1049,13 +1055,22 @@ TEST_F(MetricManagerTest, text_output_supports_dimensions)
namespace {
struct MyUpdateHook : public UpdateHook {
std::ostringstream& _output;
- FakeTimer& _timer;
+ std::mutex& _output_mutex;
+ FakeTimer& _timer;
- MyUpdateHook(std::ostringstream& output, const char* name,
+ MyUpdateHook(std::ostringstream& output,
+ std::mutex& output_mutex,
+ const char* name,
FakeTimer& timer)
- : UpdateHook(name), _output(output), _timer(timer) {}
+ : UpdateHook(name),
+ _output(output),
+ _output_mutex(output_mutex),
+ _timer(timer)
+ {}
+ ~MyUpdateHook() override = default;
void updateMetrics(const MetricLockGuard & ) override {
+ std::lock_guard lock(_output_mutex); // updateMetrics() called from metric manager thread
_output << _timer.getTime() << ": " << getName() << " called\n";
}
};
@@ -1063,6 +1078,7 @@ namespace {
TEST_F(MetricManagerTest, test_update_hooks)
{
+ std::mutex output_mutex;
std::ostringstream output;
FastOS_ThreadPool pool(256_Ki);
FakeTimer* timer = new FakeTimer(1000);
@@ -1075,9 +1091,9 @@ TEST_F(MetricManagerTest, test_update_hooks)
mm.registerMetric(lockGuard, mySet.set);
}
- MyUpdateHook preInitShort(output, "BIS", *timer);
- MyUpdateHook preInitLong(output, "BIL", *timer);
- MyUpdateHook preInitInfinite(output, "BII", *timer);
+ MyUpdateHook preInitShort(output, output_mutex, "BIS", *timer);
+ MyUpdateHook preInitLong(output, output_mutex, "BIL", *timer);
+ MyUpdateHook preInitInfinite(output, output_mutex, "BII", *timer);
mm.addMetricUpdateHook(preInitShort, 5);
mm.addMetricUpdateHook(preInitLong, 300);
mm.addMetricUpdateHook(preInitInfinite, 0);
@@ -1097,55 +1113,55 @@ TEST_F(MetricManagerTest, test_update_hooks)
pool);
output << "Init done\n";
- MyUpdateHook postInitShort(output, "AIS", *timer);
- MyUpdateHook postInitLong(output, "AIL", *timer);
- MyUpdateHook postInitInfinite(output, "AII", *timer);
+ MyUpdateHook postInitShort(output, output_mutex, "AIS", *timer);
+ MyUpdateHook postInitLong(output, output_mutex, "AIL", *timer);
+ MyUpdateHook postInitInfinite(output, output_mutex, "AII", *timer);
mm.addMetricUpdateHook(postInitShort, 5);
mm.addMetricUpdateHook(postInitLong, 400);
mm.addMetricUpdateHook(postInitInfinite, 0);
// After 5 seconds the short ones should get another.
- timer->_time = 1006;
+ timer->set_time(1006);
waitForTimeProcessed(mm, 1006);
// After 4 more seconds the short ones should get another
// since last update was a second late. (Stable periods, process time
// should not affect how often they are updated)
- timer->_time = 1010;
+ timer->set_time(1010);
waitForTimeProcessed(mm, 1010);
// Bumping considerably ahead, such that next update is in the past,
// we should only get one update called in this period.
- timer->_time = 1200;
+ timer->set_time(1200);
waitForTimeProcessed(mm, 1200);
// No updates at this time.
- timer->_time = 1204;
+ timer->set_time(1204);
waitForTimeProcessed(mm, 1204);
// Give all hooks an update
mm.updateMetrics(true);
// Last update should not have interfered with periods
- timer->_time = 1205;
+ timer->set_time(1205);
waitForTimeProcessed(mm, 1205);
// Time is just ahead of a snapshot.
- timer->_time = 1299;
+ timer->set_time(1299);
waitForTimeProcessed(mm, 1299);
// At time 1300 we are at a 5 minute snapshot bump
// All hooks should thus get an update. The one with matching period
// should only get one
- timer->_time = 1300;
+ timer->set_time(1300);
waitForTimeProcessed(mm, 1300);
// The snapshot time currently doesn't count for the metric at period
// 400. It will get an event at this time.
- timer->_time = 1450;
+ timer->set_time(1450);
waitForTimeProcessed(mm, 1450);
std::string expected(
@@ -1179,8 +1195,11 @@ TEST_F(MetricManagerTest, test_update_hooks)
"1450: AIS called\n"
"1450: AIL called\n"
);
- std::string actual(output.str());
- EXPECT_EQ(expected, actual);
+ {
+ std::lock_guard lock(output_mutex); // Need to ensure we observe all writes by metric mgr thread
+ std::string actual(output.str());
+ EXPECT_EQ(expected, actual);
+ }
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
index 54d90a0f310..d7534514f41 100644
--- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
@@ -15,28 +15,47 @@ Runnable::Runnable()
Runnable::~Runnable() {
std::lock_guard monitorGuard(_stateLock);
- assert(_state == NOT_RUNNING);
+ assert(getState() == NOT_RUNNING);
}
bool Runnable::start(FastOS_ThreadPool& pool)
{
std::unique_lock guard(_stateLock);
- _stateCond.wait(guard, [&](){ return (_state != STOPPING);});
+ _stateCond.wait(guard, [&](){ return (getState() != STOPPING);});
- if (_state != NOT_RUNNING) return false;
- _state = STARTING;
+ if (getState() != NOT_RUNNING) return false;
+ set_state(STARTING);
if (pool.NewThread(this) == nullptr) {
throw vespalib::IllegalStateException("Failed starting a new thread", VESPA_STRLOC);
}
return true;
}
+void Runnable::set_state(State new_state) noexcept
+{
+ _state.store(new_state, std::memory_order_relaxed);
+}
+
+bool Runnable::stopping() const noexcept
+{
+ State s(getState());
+ return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag());
+}
+
+bool Runnable::running() const noexcept
+{
+ State s(getState());
+ // Must check break-flag too, as threadpool will use that to close
+ // down.
+ return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag()));
+}
+
bool Runnable::stop()
{
std::lock_guard monitor(_stateLock);
- if (_state == STOPPING || _state == NOT_RUNNING) return false;
+ if (getState() == STOPPING || getState() == NOT_RUNNING) return false;
GetThread()->SetBreakFlag();
- _state = STOPPING;
+ set_state(STOPPING);
return onStop();
}
@@ -48,8 +67,8 @@ bool Runnable::onStop()
bool Runnable::join() const
{
std::unique_lock guard(_stateLock);
- assert ((_state != STARTING) && (_state != RUNNING));
- _stateCond.wait(guard, [&](){ return (_state == NOT_RUNNING);});
+ assert ((getState() != STARTING) && (getState() != RUNNING));
+ _stateCond.wait(guard, [&](){ return (getState() == NOT_RUNNING);});
return true;
}
@@ -57,21 +76,21 @@ void Runnable::Run(FastOS_ThreadInterface*, void*)
{
{
std::lock_guard guard(_stateLock);
- // Dont set state if its alreadyt at stopping. (And let run() be
+ // Don't set state if its already at stopping. (And let run() be
// called even though about to stop for consistency)
- if (_state == STARTING) {
- _state = RUNNING;
+ if (getState() == STARTING) {
+ set_state(RUNNING);
}
}
// By not catching exceptions, they should abort whole application.
- // We should thus not need to have a catch all to set state to not
+ // We should thus not need to have a catch-all to set state to not
// running.
run();
{
std::lock_guard guard(_stateLock);
- _state = NOT_RUNNING;
+ set_state(NOT_RUNNING);
_stateCond.notify_all();
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
index cf2befcc8d5..5ca344ea7ef 100644
--- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
+++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
@@ -19,6 +19,7 @@
#pragma once
#include <vespa/fastos/thread.h>
+#include <atomic>
namespace document {
@@ -29,16 +30,17 @@ public:
private:
mutable std::mutex _stateLock;
mutable std::condition_variable _stateCond;
- State _state;
+ std::atomic<State> _state;
void Run(FastOS_ThreadInterface*, void*) override;
+ void set_state(State new_state) noexcept; // _stateLock must be held
public:
/**
* Create a runnable.
* @param pool If set, runnable will be started in constructor.
*/
Runnable();
- ~Runnable();
+ ~Runnable() override;
/**
* Start this runnable.
@@ -71,26 +73,21 @@ public:
*/
virtual void run() = 0;
- /** Get the current state of this runnable. */
- State getState() const { return _state; }
+ /**
+ * Get the current state of this runnable.
+ * Thread safe (but relaxed) read; may be stale if done outside _stateLock.
+ */
+ [[nodiscard]] State getState() const noexcept {
+ return _state.load(std::memory_order_relaxed);
+ }
/** Check if system is in the process of stopping. */
- bool stopping() const
- {
- State s(getState());
- return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag());
- }
+ [[nodiscard]] bool stopping() const noexcept;
/**
* Checks if runnable is running or not. (Started is considered running)
*/
- bool running() const
- {
- State s(getState());
- // Must check breakflag too, as threadpool will use that to close
- // down.
- return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag()));
- }
+ [[nodiscard]] bool running() const noexcept;
};
}