aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2018-09-04 07:58:42 +0200
committerGitHub <noreply@github.com>2018-09-04 07:58:42 +0200
commite452bcca5d4068c2b52f310e09d90ce59b6d3562 (patch)
tree2237f59aecc12fbe7f4e24dcfa986971c1addc7a
parent0127443cbff7810ca3eb0efa9ca0c52829652833 (diff)
parent3e52a5f684a33bcdf8e7ab66e05e4f2db8357168 (diff)
Merge pull request #6775 from vespa-engine/revert-6771-geirst/fix-calc-of-oldest-flushed-serial-in-flush-engine
Revert "Geirst/fix calc of oldest flushed serial in flush engine"
-rw-r--r--searchcore/src/tests/proton/flushengine/CMakeLists.txt2
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine.cpp (renamed from searchcore/src/tests/proton/flushengine/flushengine_test.cpp)233
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/eventlogger.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp41
5 files changed, 118 insertions, 180 deletions
diff --git a/searchcore/src/tests/proton/flushengine/CMakeLists.txt b/searchcore/src/tests/proton/flushengine/CMakeLists.txt
index 6e8df3c9b7f..826c9b2390f 100644
--- a/searchcore/src/tests/proton/flushengine/CMakeLists.txt
+++ b/searchcore/src/tests/proton/flushengine/CMakeLists.txt
@@ -1,7 +1,7 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_executable(searchcore_flushengine_test_app TEST
SOURCES
- flushengine_test.cpp
+ flushengine.cpp
DEPENDS
searchcore_flushengine
searchcore_pcommon
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp
index f668072b9fd..d1a98f1b7d3 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp
@@ -3,15 +3,15 @@
#include <vespa/searchcore/proton/flushengine/cachedflushtarget.h>
#include <vespa/searchcore/proton/flushengine/flush_engine_explorer.h>
#include <vespa/searchcore/proton/flushengine/flushengine.h>
-#include <vespa/searchcore/proton/flushengine/i_tls_stats_factory.h>
#include <vespa/searchcore/proton/flushengine/threadedflushtarget.h>
#include <vespa/searchcore/proton/flushengine/tls_stats_map.h>
+#include <vespa/searchcore/proton/flushengine/i_tls_stats_factory.h>
#include <vespa/searchcore/proton/server/igetserialnum.h>
#include <vespa/searchcore/proton/test/dummy_flush_handler.h>
#include <vespa/searchcore/proton/test/dummy_flush_target.h>
+#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/test/insertion_operators.h>
-#include <vespa/vespalib/testkit/testapp.h>
#include <mutex>
#include <chrono>
@@ -42,6 +42,7 @@ public:
SimpleExecutor()
: _done()
{
+ // empty
}
Task::UP
@@ -112,7 +113,7 @@ public:
}
};
-using Targets = std::vector<IFlushTarget::SP>;
+typedef std::vector<IFlushTarget::SP> Targets;
using FlushDoneHistory = std::vector<search::SerialNum>;
@@ -140,6 +141,7 @@ public:
_done(targets.size()),
_flushDoneHistory()
{
+ // empty
}
search::SerialNum
@@ -217,6 +219,7 @@ public:
: _flushedSerial(flushedSerial), _currentSerial(currentSerial),
_start(start), _done(done), _proceed(proceed)
{
+ // empty
}
void run() override {
@@ -245,24 +248,8 @@ public:
vespalib::Gate _taskDone;
Task::UP _task;
-protected:
- SimpleTarget(const std::string &name, const Type &type, search::SerialNum flushedSerial = 0, bool proceedImmediately = true) :
- test::DummyFlushTarget(name, type, Component::OTHER),
- _flushedSerial(flushedSerial),
- _proceed(),
- _initDone(),
- _taskStart(),
- _taskDone(),
- _task(std::make_unique<SimpleTask>(_taskStart, _taskDone, &_proceed,
- _flushedSerial, _currentSerial))
- {
- if (proceedImmediately) {
- _proceed.countDown();
- }
- }
-
public:
- using SP = std::shared_ptr<SimpleTarget>;
+ typedef std::shared_ptr<SimpleTarget> SP;
SimpleTarget(Task::UP task, const std::string &name) :
test::DummyFlushTarget(name),
@@ -276,14 +263,24 @@ public:
{
}
+ SimpleTarget(const std::string &name, search::SerialNum flushedSerial = 0, bool proceedImmediately = true) :
+ test::DummyFlushTarget(name),
+ _flushedSerial(flushedSerial),
+ _proceed(),
+ _initDone(),
+ _taskStart(),
+ _taskDone(),
+ _task(new SimpleTask(_taskStart, _taskDone, &_proceed,
+ _flushedSerial, _currentSerial))
+ {
+ if (proceedImmediately) {
+ _proceed.countDown();
+ }
+ }
SimpleTarget(search::SerialNum flushedSerial = 0, bool proceedImmediately = true)
: SimpleTarget("anon", flushedSerial, proceedImmediately)
{ }
- SimpleTarget(const std::string &name, search::SerialNum flushedSerial = 0, bool proceedImmediately = true)
- : SimpleTarget(name, Type::OTHER, flushedSerial, proceedImmediately)
- { }
-
virtual Time
getLastFlushTime() const override { return fastos::ClockSystem::now(); }
@@ -307,13 +304,6 @@ public:
};
-class GCTarget : public SimpleTarget {
-public:
- GCTarget(const vespalib::string &name, search::SerialNum flushedSerial)
- : SimpleTarget(name, Type::GC, flushedSerial)
- {}
-};
-
class AssertedTarget : public SimpleTarget {
public:
mutable bool _mgain;
@@ -376,7 +366,10 @@ public:
public:
typedef std::shared_ptr<SimpleStrategy> SP;
- SimpleStrategy() {}
+ SimpleStrategy()
+ {
+ // empty
+ }
uint32_t
indexOf(const IFlushTarget::SP &target) const
@@ -456,14 +449,6 @@ struct Fixture
{
}
- void putFlushHandler(const vespalib::string &docTypeName, IFlushHandler::SP handler) {
- engine.putFlushHandler(DocTypeName(docTypeName), handler);
- }
-
- void addTargetToStrategy(IFlushTarget::SP target) {
- strategy->_targets.push_back(std::move(target));
- }
-
std::shared_ptr<SimpleHandler>
addSimpleHandler(Targets targets)
{
@@ -486,17 +471,21 @@ struct Fixture
}
};
+
TEST_F("require that strategy controls flush target", Fixture(1, IINTERVAL))
{
vespalib::Gate fooG, barG;
std::vector<vespalib::string> order;
- auto foo = std::make_shared<SimpleTarget>(std::make_unique<AppendTask>("foo", order, fooG), "foo");
- auto bar = std::make_shared<SimpleTarget>(std::make_unique<AppendTask>("bar", order, barG), "bar");
- f.addTargetToStrategy(foo);
- f.addTargetToStrategy(bar);
-
- auto handler = std::make_shared<SimpleHandler>(Targets({bar, foo}), "anon");
- f.putFlushHandler("anon", handler);
+ FlushTask::UP fooT(new AppendTask("foo", order, fooG));
+ FlushTask::UP barT(new AppendTask("bar", order, barG));
+ SimpleTarget::SP foo(new SimpleTarget(std::move(fooT), "foo"));
+ SimpleTarget::SP bar(new SimpleTarget(std::move(barT), "bar"));
+ f.strategy->_targets.push_back(foo);
+ f.strategy->_targets.push_back(bar);
+
+ SimpleHandler::SP handler(new SimpleHandler({bar, foo}));
+ DocTypeName dtnvanon("anon");
+ f.engine.putFlushHandler(dtnvanon, handler);
f.engine.start();
EXPECT_TRUE(fooG.await(LONG_TIMEOUT));
@@ -513,20 +502,25 @@ TEST_F("require that zero handlers does not core", Fixture(2, 50))
TEST_F("require that zero targets does not core", Fixture(2, 50))
{
- f.putFlushHandler("foo", std::make_shared<SimpleHandler>(Targets(), "foo"));
- f.putFlushHandler("bar", std::make_shared<SimpleHandler>(Targets(), "bar"));
+ DocTypeName dtnvfoo("foo");
+ DocTypeName dtnvbar("bar");
+ f.engine.putFlushHandler(dtnvfoo,
+ IFlushHandler::SP(new SimpleHandler({}, "foo")));
+ f.engine.putFlushHandler(dtnvbar,
+ IFlushHandler::SP(new SimpleHandler({}, "bar")));
f.engine.start();
}
TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL))
{
- auto foo = std::make_shared<SimpleTarget>("foo", 10);
- auto bar = std::make_shared<SimpleTarget>("bar", 20);
- f.addTargetToStrategy(foo);
- f.addTargetToStrategy(bar);
-
- auto handler = std::make_shared<SimpleHandler>(Targets({foo, bar}), "anon", 25);
- f.putFlushHandler("anon", handler);
+ SimpleTarget::SP foo(new SimpleTarget("foo", 10));
+ SimpleTarget::SP bar(new SimpleTarget("bar", 20));
+ f.strategy->_targets.push_back(foo);
+ f.strategy->_targets.push_back(bar);
+
+ SimpleHandler::SP handler(new SimpleHandler({foo, bar}, "anon", 25));
+ DocTypeName dtnvanon("anon");
+ f.engine.putFlushHandler(dtnvanon, handler);
f.engine.start();
EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT));
@@ -535,44 +529,24 @@ TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL))
EXPECT_EQUAL(FlushDoneHistory({ 10, 20, 25 }), handlerFlushDoneHistory);
}
-TEST_F("require that GC targets are not considered when oldest serial is found", Fixture(1, IINTERVAL))
-{
- auto foo = std::make_shared<SimpleTarget>("foo", 5);
- auto bar = std::make_shared<GCTarget>("bar", 10);
- auto baz = std::make_shared<SimpleTarget>("baz", 20);
- f.addTargetToStrategy(foo);
- f.addTargetToStrategy(bar);
- f.addTargetToStrategy(baz);
-
- auto handler = std::make_shared<SimpleHandler>(Targets({foo, bar, baz}), "handler", 25);
- f.putFlushHandler("handler", handler);
- f.engine.start();
-
- // The targets are flushed in sequence: 'foo', 'bar', 'baz'
- EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT));
- EXPECT_EQUAL(25ul, handler->_oldestSerial);
-
- // Before anything is flushed the oldest serial is 5.
- // After 'foo' has been flushed the oldest serial is 20 as GC target 'bar' is not considered.
- EXPECT_EQUAL(FlushDoneHistory({ 5, 20, 20, 25 }), handler->getFlushDoneHistory());
-}
-
TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
{
- auto fooT1 = std::make_shared<SimpleTarget>("fooT1", 10);
- auto fooT2 = std::make_shared<SimpleTarget>("fooT2", 20);
- auto barT1 = std::make_shared<SimpleTarget>("barT1", 5);
- auto barT2 = std::make_shared<SimpleTarget>("barT2", 15);
- f.addTargetToStrategy(fooT1);
- f.addTargetToStrategy(fooT2);
- f.addTargetToStrategy(barT1);
- f.addTargetToStrategy(barT2);
-
- auto fooH = std::make_shared<SimpleHandler>(Targets({fooT1, fooT2}), "fooH", 25);
- f.putFlushHandler("foo", fooH);
-
- auto barH = std::make_shared<SimpleHandler>(Targets({barT1, barT2}), "barH", 20);
- f.putFlushHandler("bar", barH);
+ SimpleTarget::SP fooT1(new SimpleTarget("fooT1", 10));
+ SimpleTarget::SP fooT2(new SimpleTarget("fooT2", 20));
+ SimpleTarget::SP barT1(new SimpleTarget("barT1", 5));
+ SimpleTarget::SP barT2(new SimpleTarget("barT2", 15));
+ f.strategy->_targets.push_back(fooT1);
+ f.strategy->_targets.push_back(fooT2);
+ f.strategy->_targets.push_back(barT1);
+ f.strategy->_targets.push_back(barT2);
+
+ SimpleHandler::SP fooH(new SimpleHandler({fooT1, fooT2}, "fooH", 25));
+ DocTypeName dtnvfoo("foo");
+ f.engine.putFlushHandler(dtnvfoo, fooH);
+
+ SimpleHandler::SP barH(new SimpleHandler({barT1, barT2}, "barH", 20));
+ DocTypeName dtnvbar("bar");
+ f.engine.putFlushHandler(dtnvbar, barH);
f.engine.start();
@@ -600,10 +574,11 @@ TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
TEST_F("require that target can refuse flush", Fixture(2, IINTERVAL))
{
- auto target = std::make_shared<SimpleTarget>();
- auto handler = std::make_shared<SimpleHandler>(Targets({target}));
+ SimpleTarget::SP target(new SimpleTarget());
+ SimpleHandler::SP handler(new SimpleHandler({target}));
target->_task = searchcorespi::FlushTask::UP();
- f.putFlushHandler("anon", handler);
+ DocTypeName dtnvanon("anon");
+ f.engine.putFlushHandler(dtnvanon, handler);
f.engine.start();
EXPECT_TRUE(target->_initDone.await(LONG_TIMEOUT));
@@ -614,9 +589,10 @@ TEST_F("require that target can refuse flush", Fixture(2, IINTERVAL))
TEST_F("require that targets are flushed when nothing new to flush",
Fixture(2, IINTERVAL))
{
- auto target = std::make_shared<SimpleTarget>("anon", 5); // oldest unflushed serial num = 5
- auto handler = std::make_shared<SimpleHandler>(Targets({target}), "anon", 4); // current serial num = 4
- f.putFlushHandler("anon", handler);
+ SimpleTarget::SP target(new SimpleTarget("anon", 5)); // oldest unflushed serial num = 5
+ SimpleHandler::SP handler(new SimpleHandler({target}, "anon", 4)); // current serial num = 4
+ DocTypeName dtnvanon("anon");
+ f.engine.putFlushHandler(dtnvanon, handler);
f.engine.start();
EXPECT_TRUE(target->_initDone.await(LONG_TIMEOUT));
@@ -626,13 +602,14 @@ TEST_F("require that targets are flushed when nothing new to flush",
TEST_F("require that flushing targets are skipped", Fixture(2, IINTERVAL))
{
- auto foo = std::make_shared<SimpleTarget>("foo");
- auto bar = std::make_shared<SimpleTarget>("bar");
- f.addTargetToStrategy(foo);
- f.addTargetToStrategy(bar);
-
- auto handler = std::make_shared<SimpleHandler>(Targets({bar, foo}));
- f.putFlushHandler("anon", handler);
+ SimpleTarget::SP foo(new SimpleTarget("foo"));
+ SimpleTarget::SP bar(new SimpleTarget("bar"));
+ f.strategy->_targets.push_back(foo);
+ f.strategy->_targets.push_back(bar);
+
+ SimpleHandler::SP handler(new SimpleHandler({bar, foo}));
+ DocTypeName dtnvanon("anon");
+ f.engine.putFlushHandler(dtnvanon, handler);
f.engine.start();
EXPECT_TRUE(foo->_taskDone.await(LONG_TIMEOUT));
@@ -641,11 +618,12 @@ TEST_F("require that flushing targets are skipped", Fixture(2, IINTERVAL))
TEST_F("require that updated targets are not skipped", Fixture(2, IINTERVAL))
{
- auto target = std::make_shared<SimpleTarget>("target", 1);
- f.addTargetToStrategy(target);
+ SimpleTarget::SP target(new SimpleTarget("target", 1));
+ f.strategy->_targets.push_back(target);
- auto handler = std::make_shared<SimpleHandler>(Targets({target}), "handler", 0);
- f.putFlushHandler("handler", handler);
+ SimpleHandler::SP handler(new SimpleHandler({target}, "handler", 0));
+ DocTypeName dtnvhandler("handler");
+ f.engine.putFlushHandler(dtnvhandler, handler);
f.engine.start();
EXPECT_TRUE(target->_taskDone.await(LONG_TIMEOUT));
@@ -655,7 +633,8 @@ TEST("require that threaded target works")
{
SimpleExecutor executor;
SimpleGetSerialNum getSerialNum;
- auto target = std::make_shared<ThreadedFlushTarget>(executor, getSerialNum, std::make_shared<SimpleTarget>());
+ IFlushTarget::SP target(new SimpleTarget());
+ target.reset(new ThreadedFlushTarget(executor, getSerialNum, target));
EXPECT_FALSE(executor._done.await(SHORT_TIMEOUT));
EXPECT_TRUE(target->initFlush(0).get() != NULL);
@@ -664,7 +643,8 @@ TEST("require that threaded target works")
TEST("require that cached target works")
{
- auto target = std::make_shared<CachedFlushTarget>(std::make_shared<AssertedTarget>());
+ IFlushTarget::SP target(new AssertedTarget());
+ target.reset(new CachedFlushTarget(target));
for (uint32_t i = 0; i < 2; ++i) {
EXPECT_EQUAL(0l, target->getApproxMemoryGain().getBefore());
EXPECT_EQUAL(0l, target->getApproxMemoryGain().getAfter());
@@ -674,11 +654,12 @@ TEST("require that cached target works")
TEST_F("require that trigger flush works", Fixture(2, IINTERVAL))
{
- auto target = std::make_shared<SimpleTarget>("target", 1);
- f.addTargetToStrategy(target);
+ SimpleTarget::SP target(new SimpleTarget("target", 1));
+ f.strategy->_targets.push_back(target);
- auto handler = std::make_shared<SimpleHandler>(Targets({target}), "handler", 9);
- f.putFlushHandler("handler", handler);
+ SimpleHandler::SP handler(new SimpleHandler({target}, "handler", 9));
+ DocTypeName dtnvhandler("handler");
+ f.engine.putFlushHandler(dtnvhandler, handler);
f.engine.start();
f.engine.triggerFlush();
EXPECT_TRUE(target->_initDone.await(LONG_TIMEOUT));
@@ -712,13 +693,13 @@ assertThatHandlersInCurrentSet(FlushEngine & engine, const std::vector<const cha
TEST_F("require that concurrency works", Fixture(2, 1))
{
- auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
- auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
- auto target3 = std::make_shared<SimpleTarget>("target3", 3, false);
- auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3}), "handler", 9);
- f.putFlushHandler("handler", handler);
+ SimpleTarget::SP target1(new SimpleTarget("target1", 1, false));
+ SimpleTarget::SP target2(new SimpleTarget("target2", 2, false));
+ SimpleTarget::SP target3(new SimpleTarget("target3", 3, false));
+ SimpleHandler::SP handler(new SimpleHandler({target1, target2, target3}, "handler", 9));
+ DocTypeName dtnvhandler("handler");
+ f.engine.putFlushHandler(dtnvhandler, handler);
f.engine.start();
-
EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT));
EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT));
EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT));
@@ -733,11 +714,11 @@ TEST_F("require that concurrency works", Fixture(2, 1))
TEST_F("require that state explorer can list flush targets", Fixture(1, 1))
{
- auto target = std::make_shared<SimpleTarget>("target1", 100, false);
- f.putFlushHandler("handler",
- std::make_shared<SimpleHandler>(
- Targets({target, std::make_shared<SimpleTarget>("target2", 50, true)}),
- "handler", 9));
+ SimpleTarget::SP target = std::make_shared<SimpleTarget>("target1", 100, false);
+ f.engine.putFlushHandler(DocTypeName("handler"),
+ std::make_shared<SimpleHandler>(
+ Targets({target, std::make_shared<SimpleTarget>("target2", 50, true)}),
+ "handler", 9));
f.engine.start();
target->_initDone.await(LONG_TIMEOUT);
target->_taskStart.await(LONG_TIMEOUT);
diff --git a/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp b/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp
index 5966589d635..78f73742fed 100644
--- a/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp
@@ -109,17 +109,13 @@ EventLogger::flushStart(const string &name, int64_t beforeMemory, int64_t afterM
}
void
-EventLogger::flushComplete(const string &name, int64_t elapsedTimeMs, SerialNum flushed,
+EventLogger::flushComplete(const string &name, int64_t elapsedTimeMs,
const string &outputPath, size_t outputPathElems)
{
JSONStringer jstr;
jstr.beginObject();
jstr.appendKey("name").appendString(name);
jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs);
- jstr.appendKey("serialnum")
- .beginObject()
- .appendKey("flushed").appendInt64(flushed)
- .endObject();
if (!outputPath.empty()) {
jstr.appendKey("output");
LogUtil::logDir(jstr, outputPath, outputPathElems);
@@ -128,20 +124,6 @@ EventLogger::flushComplete(const string &name, int64_t elapsedTimeMs, SerialNum
EV_STATE("flush.complete", jstr.toString().data());
}
-void
-EventLogger::flushPrune(const string &name, SerialNum oldestFlushed)
-{
- JSONStringer jstr;
- jstr.beginObject();
- jstr.appendKey("name").appendString(name);
- jstr.appendKey("serialnum")
- .beginObject()
- .appendKey("oldestflushed").appendInt64(oldestFlushed)
- .endObject();
- jstr.endObject();
- EV_STATE("flush.prune", jstr.toString().data());
-}
-
namespace {
void
diff --git a/searchcore/src/vespa/searchcore/proton/common/eventlogger.h b/searchcore/src/vespa/searchcore/proton/common/eventlogger.h
index 574e650732a..6ba8852496e 100644
--- a/searchcore/src/vespa/searchcore/proton/common/eventlogger.h
+++ b/searchcore/src/vespa/searchcore/proton/common/eventlogger.h
@@ -41,10 +41,8 @@ public:
SerialNum current);
static void flushComplete(const string &name,
int64_t elapsedTimeMs,
- SerialNum flushed,
const string &outputPath,
size_t outputPathElems);
- static void flushPrune(const string &name, SerialNum oldestFlushed);
static void loadAttributeStart(const vespalib::string &subDbName, const vespalib::string &attrName);
static void loadAttributeComplete(const vespalib::string &subDbName,
const vespalib::string &attrName, int64_t elapsedTimeMs);
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index f7e0b7981bb..0d2c556b4d6 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -22,23 +22,15 @@ namespace proton {
namespace {
-std::pair<search::SerialNum, vespalib::string>
-findOldestFlushedTarget(const IFlushTarget::List &lst, const IFlushHandler &handler)
+search::SerialNum
+findOldestFlushedSerial(const IFlushTarget::List &lst, const IFlushHandler &handler)
{
- search::SerialNum oldestFlushedSerial = handler.getCurrentSerialNumber();
- vespalib::string oldestFlushedName = "null";
- for (const IFlushTarget::SP &target : lst) {
- if (target->getType() != IFlushTarget::Type::GC) {
- search::SerialNum targetFlushedSerial = target->getFlushedSerialNum();
- if (targetFlushedSerial <= oldestFlushedSerial) {
- oldestFlushedSerial = targetFlushedSerial;
- oldestFlushedName = target->getName();
- }
- }
+ search::SerialNum ret(handler.getCurrentSerialNumber());
+ for (const IFlushTarget::SP & target : lst) {
+ ret = std::min(ret, target->getFlushedSerialNum());
}
- LOG(debug, "Oldest flushed serial for handler='%s', target='%s': %" PRIu64 ".",
- handler.getName().c_str(), oldestFlushedName.c_str(), oldestFlushedSerial);
- return std::make_pair(oldestFlushedSerial, oldestFlushedName);
+ LOG(debug, "Oldest flushed serial for '%s' is %" PRIu64 ".", handler.getName().c_str(), ret);
+ return ret;
}
void
@@ -182,16 +174,6 @@ FlushEngine::Run(FastOS_ThreadInterface *, void *)
prune();
}
-namespace {
-
-vespalib::string
-createName(const IFlushHandler &handler, const vespalib::string &targetName)
-{
- return (handler.getName() + "." + targetName);
-}
-
-}
-
bool
FlushEngine::prune()
{
@@ -205,11 +187,7 @@ FlushEngine::prune()
}
for (const auto &handler : toPrune) {
IFlushTarget::List lst = handler->getFlushTargets();
- auto oldestFlushed = findOldestFlushedTarget(lst, *handler);
- if (LOG_WOULD_LOG(event)) {
- EventLogger::flushPrune(createName(*handler, oldestFlushed.second), oldestFlushed.first);
- }
- handler->flushDone(oldestFlushed.first);
+ handler->flushDone(findOldestFlushedSerial(lst, *handler));
}
return true;
}
@@ -355,8 +333,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
}
if (LOG_WOULD_LOG(event)) {
FlushStats stats = ctx.getTarget()->getLastFlushStats();
- EventLogger::flushComplete(ctx.getName(), duration.ms(), ctx.getTarget()->getFlushedSerialNum(),
- stats.getPath(), stats.getPathElementsToLog());
+ EventLogger::flushComplete(ctx.getName(), duration.ms(), stats.getPath(), stats.getPathElementsToLog());
}
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec());
std::lock_guard<std::mutex> guard(_lock);