aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2016-06-17 18:12:01 +0200
committerGitHub <noreply@github.com>2016-06-17 18:12:01 +0200
commit8dd0077ab81f2f0051bfc8169b218c6a0a99133a (patch)
tree5ff91d21b32f364d455d880bfed7aa528e68609f
parent35ccd8e0806a645d62dfe5d2c1d78e4354b0898d (diff)
parenta6d0d397b173c13c4c82d373c017770005d4ba60 (diff)
Merge pull request #37 from yahoo/toregge/fix-flushengine-prune-strategy
Fixup flushengine prune strategy (tls pruning).
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine.cpp40
-rw-r--r--searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp2
-rw-r--r--searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp61
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/configstore.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp2
18 files changed, 113 insertions, 97 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp
index 59b86671a0d..cb195a9cdc6 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp
@@ -15,6 +15,7 @@ LOG_SETUP("flushengine_test");
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/util/sync.h>
+#include <vespa/vespalib/test/insertion_operators.h>
#include <memory>
// --------------------------------------------------------------------------------
@@ -75,6 +76,7 @@ public:
search::SerialNum _oldestSerial;
search::SerialNum _currentSerial;
vespalib::CountDownLatch _done;
+ std::vector<search::SerialNum> _flushDoneHistory;
public:
typedef std::shared_ptr<SimpleHandler> SP;
@@ -85,7 +87,8 @@ public:
_targets(targets),
_oldestSerial(0),
_currentSerial(currentSerial),
- _done(targets.size())
+ _done(targets.size() + 1),
+ _flushDoneHistory()
{
// empty
}
@@ -112,12 +115,15 @@ public:
LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")",
getName().c_str(), oldestSerial);
_oldestSerial = std::max(_oldestSerial, oldestSerial);
+ _flushDoneHistory.push_back(oldestSerial);
_done.countDown();
}
};
class SimpleTask : public searchcorespi::FlushTask {
+ search::SerialNum &_flushedSerial;
+ search::SerialNum &_currentSerial;
public:
vespalib::Gate &_start;
vespalib::Gate &_done;
@@ -126,8 +132,11 @@ public:
public:
SimpleTask(vespalib::Gate &start,
vespalib::Gate &done,
- vespalib::Gate *proceed)
- : _start(start), _done(done), _proceed(proceed)
+ vespalib::Gate *proceed,
+ search::SerialNum &flushedSerial,
+ search::SerialNum &currentSerial)
+ : _flushedSerial(flushedSerial), _currentSerial(currentSerial),
+ _start(start), _done(done), _proceed(proceed)
{
// empty
}
@@ -137,6 +146,7 @@ public:
if (_proceed != NULL) {
_proceed->await();
}
+ _flushedSerial = _currentSerial;
_done.countDown();
}
@@ -150,6 +160,7 @@ public:
class SimpleTarget : public test::DummyFlushTarget {
public:
search::SerialNum _flushedSerial;
+ search::SerialNum _currentSerial;
vespalib::Gate _proceed;
vespalib::Gate _initDone;
vespalib::Gate _taskStart;
@@ -162,6 +173,7 @@ public:
SimpleTarget(Task::UP task, const std::string &name) :
test::DummyFlushTarget(name),
_flushedSerial(0),
+ _currentSerial(0),
_proceed(),
_initDone(),
_taskStart(),
@@ -177,7 +189,8 @@ public:
_initDone(),
_taskStart(),
_taskDone(),
- _task(new SimpleTask(_taskStart, _taskDone, &_proceed))
+ _task(new SimpleTask(_taskStart, _taskDone, &_proceed,
+ _flushedSerial, _currentSerial))
{
if (proceedImmediately) {
_proceed.countDown();
@@ -190,8 +203,8 @@ public:
virtual SerialNum
getFlushedSerialNum() const override
{
- LOG(info, "SimpleTarget(%s)::getFlushedSerialNum()",
- getName().c_str());
+ LOG(info, "SimpleTarget(%s)::getFlushedSerialNum() = %" PRIu64,
+ getName().c_str(), _flushedSerial);
return _flushedSerial;
}
@@ -200,6 +213,7 @@ public:
{
LOG(info, "SimpleTarget(%s)::initFlush(%" PRIu64 ")",
getName().c_str(), currentSerial);
+ _currentSerial = currentSerial;
_initDone.countDown();
return std::move(_task);
}
@@ -340,7 +354,7 @@ struct Fixture
Fixture(uint32_t numThreads, uint32_t idleIntervalMS)
: tlsStatsFactory(std::make_shared<SimpleTlsStatsFactory>()),
strategy(std::make_shared<SimpleStrategy>()),
- engine(tlsStatsFactory, strategy, numThreads, idleIntervalMS, false)
+ engine(tlsStatsFactory, strategy, numThreads, idleIntervalMS)
{
}
};
@@ -398,7 +412,9 @@ TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL))
f.engine.start();
EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT));
- EXPECT_EQUAL(20ul, handler->_oldestSerial);
+ EXPECT_EQUAL(25ul, handler->_oldestSerial);
+ EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }),
+ handler->_flushDoneHistory);
}
TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
@@ -423,9 +439,13 @@ TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL))
f.engine.start();
EXPECT_TRUE(fooH->_done.await(LONG_TIMEOUT));
- EXPECT_EQUAL(20ul, fooH->_oldestSerial);
+ EXPECT_EQUAL(25ul, fooH->_oldestSerial);
+ EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }),
+ fooH->_flushDoneHistory);
EXPECT_TRUE(barH->_done.await(LONG_TIMEOUT));
- EXPECT_EQUAL(15ul, barH->_oldestSerial);
+ EXPECT_EQUAL(20ul, barH->_oldestSerial);
+ EXPECT_EQUAL(std::vector<search::SerialNum>({ 5, 15, 20 }),
+ barH->_flushDoneHistory);
}
TEST_F("require that target can refuse flush", Fixture(2, IINTERVAL))
diff --git a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp
index ac3dbb8fed2..b3fd9a050a8 100644
--- a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp
@@ -72,7 +72,7 @@ public:
targetType,
flushedSerial,
approxDiskBytes);
- _result.push_back(std::make_shared<FlushContext>(handler, target, 0, 0));
+ _result.push_back(std::make_shared<FlushContext>(handler, target, 0));
return *this;
}
ContextsBuilder &add(const vespalib::string &handlerName,
diff --git a/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp b/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp
index 2f4083228f9..6c235766de6 100644
--- a/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp
+++ b/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp
@@ -105,7 +105,7 @@ public:
return *this;
}
ContextBuilder &add(const IFlushTarget::SP &target, SerialNum lastSerial = 0) {
- FlushContext::SP ctx(new FlushContext(_handler, target, 0, lastSerial));
+ FlushContext::SP ctx(new FlushContext(_handler, target, lastSerial));
return add(ctx);
}
const FlushContext::List &list() const { return _list; }
@@ -282,22 +282,22 @@ requireThatWeCanOrderByTlsSize()
(handler1,
createTargetT("t2", TimeStamp(now.val() - 10 * TimeStamp::SEC),
1900),
- 2000, 2000)).
+ 2000)).
add(std::make_shared<FlushContext>
(handler2,
createTargetT("t1", TimeStamp(now.val() - 5 * TimeStamp::SEC),
1000),
- 2000, 2000)).
+ 2000)).
add(std::make_shared<FlushContext>
(handler1,
createTargetT("t4", TimeStamp(),
1000),
- 2000, 2000)).
+ 2000)).
add(std::make_shared<FlushContext>
(handler2,
createTargetT("t3", TimeStamp(now.val() - 15 * TimeStamp::SEC),
1900),
- 2000, 2000));
+ 2000));
{ // sum of tls sizes above limit, trigger sort order based on tls size
MemoryFlush flush({1000, 3 * gibi, 1.0, 1000, 1.0, 2000, TimeStamp(2 * TimeStamp::SEC)}, start);
EXPECT_TRUE(assertOrder(StringList().add("t4").add("t1").add("t2").add("t3"),
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp
index 7f436a8d594..1624bc3b51c 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp
@@ -10,13 +10,11 @@ namespace proton {
FlushContext::FlushContext(
const IFlushHandler::SP &handler,
const IFlushTarget::SP &target,
- search::SerialNum oldestFlushable,
search::SerialNum lastSerial)
: _name(createName(*handler, *target)),
_handler(handler),
_target(target),
_task(),
- _oldestFlushable(oldestFlushable),
_lastSerial(lastSerial)
{
// empty
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h
index 9f2b557b3a9..837a99f153e 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h
@@ -19,7 +19,6 @@ private:
IFlushHandler::SP _handler;
IFlushTarget::SP _target;
searchcorespi::FlushTask::UP _task;
- search::SerialNum _oldestFlushable;
search::SerialNum _lastSerial;
public:
@@ -43,7 +42,6 @@ public:
*/
FlushContext(const IFlushHandler::SP &handler,
const IFlushTarget::SP &target,
- search::SerialNum oldestFlushable,
search::SerialNum lastSerial);
/**
@@ -83,13 +81,6 @@ public:
const IFlushTarget::SP & getTarget() const { return _target; }
/**
- * Returns the oldest flushable serial number.
- *
- * @return The oldest flushable serial number
- */
- search::SerialNum getOldestFlushable() const { return _oldestFlushable; }
-
- /**
* Returns the last serial number.
*
* @return The last serial number
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index a948933ca81..f3013b4e5de 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -22,16 +22,13 @@ namespace {
search::SerialNum
findOldestFlushedSerial(const IFlushTarget::List &lst,
- const IFlushHandler &handler,
- const IFlushTarget *self)
+ const IFlushHandler &handler)
{
search::SerialNum ret(handler.getCurrentSerialNumber());
for (const IFlushTarget::SP & target : lst) {
- if (self != target.get()) {
- ret = std::min(ret, target->getFlushedSerialNum());
- }
+ ret = std::min(ret, target->getFlushedSerialNum());
}
- LOG(debug, "Oldest flushed serial for '%s' will be %" PRIu64 " after flush.", handler.getName().c_str(), ret);
+ LOG(debug, "Oldest flushed serial for '%s' is %" PRIu64 ".", handler.getName().c_str(), ret);
return ret;
}
@@ -55,11 +52,10 @@ FlushEngine::FlushInfo::FlushInfo(uint32_t taskId,
FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
tlsStatsFactory,
IFlushStrategy::SP strategy, uint32_t numThreads,
- uint32_t idleIntervalMS, bool enableAutoPrune)
+ uint32_t idleIntervalMS)
: _closed(false),
_maxConcurrent(numThreads),
_idleIntervalMS(idleIntervalMS),
- _enableAutoPrune(enableAutoPrune),
_taskId(0),
_threadPool(128 * 1024),
_strategy(strategy),
@@ -70,7 +66,8 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
_flushing(),
_strategyLock(),
_strategyMonitor(),
- _tlsStatsFactory(tlsStatsFactory)
+ _tlsStatsFactory(tlsStatsFactory),
+ _pendingPrune()
{
// empty
}
@@ -128,10 +125,10 @@ bool
FlushEngine::wait(size_t minimumWaitTimeIfReady)
{
MonitorGuard guard(_monitor);
- if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard)) {
+ if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard) && _pendingPrune.empty()) {
guard.wait(minimumWaitTimeIfReady);
}
- while ( ! canFlushMore(guard) ) {
+ while ( ! canFlushMore(guard) && _pendingPrune.empty()) {
guard.wait(1000); // broadcast when flush done
}
return !_closed;
@@ -146,6 +143,9 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg)
vespalib::string prevFlushName;
while (wait(shouldIdle ? _idleIntervalMS : 0)) {
shouldIdle = false;
+ if (prune()) {
+ continue; // Prune attempted on one or more handlers
+ }
prevFlushName = flushNextTarget(prevFlushName);
if ( ! prevFlushName.empty()) {
// Sleep at least 10 ms after a successful flush in order to avoid busy loop in case
@@ -154,23 +154,26 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg)
} else {
shouldIdle = true;
}
- if (_enableAutoPrune) {
- prune();
- }
LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str());
}
}
-void FlushEngine::prune()
+bool
+FlushEngine::prune()
{
- if (_flushing.empty()) {
+ std::set<IFlushHandler::SP> toPrune;
+ {
MonitorGuard guard(_monitor);
- for (const auto & it : _handlers) {
- IFlushHandler & handler(*it.second);
- IFlushTarget::List lst = handler.getFlushTargets();
- handler.flushDone(findOldestFlushedSerial(lst, handler, NULL));
+ if (_pendingPrune.empty()) {
+ return false;
}
+ _pendingPrune.swap(toPrune);
}
+ for (const auto &handler : toPrune) {
+ IFlushTarget::List lst = handler->getFlushTargets();
+ handler->flushDone(findOldestFlushedSerial(lst, *handler));
+ }
+ return true;
}
bool FlushEngine::isFlushing(const MonitorGuard & guard, const vespalib::string & name) const
@@ -201,7 +204,6 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const
if (!isFlushing(guard, FlushContext::createName(handler, *target)) || includeFlushingTargets) {
ret.push_back(FlushContext::SP(new FlushContext(it.second,
IFlushTarget::SP(new CachedFlushTarget(target)),
- findOldestFlushedSerial(lst, handler, target.get()),
serial)));
} else {
LOG(debug, "Target '%s' with flushedSerialNum = %ld already has a flush going. Local last serial = %ld.",
@@ -263,7 +265,7 @@ FlushEngine::flushAll(const FlushContext::List &lst)
ctx->getName().c_str(),
ctx->getTarget()->getFlushedSerialNum() + 1,
ctx->getHandler()->getCurrentSerialNumber());
- _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx, ctx->getOldestFlushable())));
+ _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx)));
} else {
LOG(debug, "Target '%s' failed to initiate flush of transactions %" PRIu64 " through %" PRIu64 ".",
ctx->getName().c_str(),
@@ -303,7 +305,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name)
name.c_str(), lst.first.size());
FastOS_Thread::Sleep(1000);
}
- _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx, ctx->getOldestFlushable())));
+ _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx)));
return ctx->getName();
}
@@ -340,6 +342,8 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec());
MonitorGuard guard(_monitor);
_flushing.erase(taskId);
+ assert(ctx.getHandler());
+ _pendingPrune.insert(ctx.getHandler());
guard.broadcast();
}
@@ -348,7 +352,12 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
const IFlushHandler::SP &flushHandler)
{
MonitorGuard guard(_monitor);
- return _handlers.putHandler(docTypeName, flushHandler);
+ IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler));
+ if (result) {
+ _pendingPrune.erase(result);
+ }
+ _pendingPrune.insert(flushHandler);
+ return std::move(result);
}
IFlushHandler::SP
@@ -362,7 +371,9 @@ IFlushHandler::SP
FlushEngine::removeFlushHandler(const DocTypeName &docTypeName)
{
MonitorGuard guard(_monitor);
- return _handlers.removeHandler(docTypeName);
+ IFlushHandler::SP result(_handlers.removeHandler(docTypeName));
+ _pendingPrune.erase(result);
+ return std::move(result);
}
FlushEngine::FlushMetaSet
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 215a8ac8de5..d8d77c11b8f 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -49,7 +49,6 @@ private:
bool _closed;
const uint32_t _maxConcurrent;
const uint32_t _idleIntervalMS;
- const bool _enableAutoPrune;
uint32_t _taskId;
FastOS_ThreadPool _threadPool;
IFlushStrategy::SP _strategy;
@@ -61,13 +60,14 @@ private:
vespalib::Lock _strategyLock; // serialize setStrategy calls
vespalib::Monitor _strategyMonitor;
std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory;
+ std::set<IFlushHandler::SP> _pendingPrune;
FlushContext::List getTargetList(bool includeFlushingTargets) const;
std::pair<FlushContext::List,bool> getSortedTargetList(vespalib::MonitorGuard &strategyGuard) const;
FlushContext::SP initNextFlush(const FlushContext::List &lst);
vespalib::string flushNextTarget(const vespalib::string & name);
void flushAll(const FlushContext::List &lst);
- void prune();
+ bool prune();
uint32_t initFlush(const FlushContext &ctx);
uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target);
void flushDone(const FlushContext &ctx, uint32_t taskId);
@@ -93,15 +93,10 @@ public:
* @param strategy The flushing strategy to use.
* @param numThreads The number of worker threads to use.
* @param idleInterval The interval between when flushes are checked whne there are no one progressing.
- * @param enableAutoPrune Indicate if pruning shall be done even if there
- are no flushing happening. Turn off for some tests.
- Needed for pruning to be correct if one flush is started
- while another is in progress. In that case the pruning
- will be too conservative.
*/
FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
tlsStatsFactory,
- IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS, bool enableAutoPrune);
+ IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS);
/**
* Destructor. Waits for all pending tasks to complete.
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp
index fc692979753..8af59d3079f 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp
@@ -9,12 +9,10 @@ namespace proton {
FlushTask::FlushTask(uint32_t taskId,
FlushEngine &engine,
- const FlushContext::SP &ctx,
- search::SerialNum serial)
+ const FlushContext::SP &ctx)
: _taskId(taskId),
_engine(engine),
- _context(ctx),
- _serial(serial)
+ _context(ctx)
{
LOG_ASSERT(_context.get() != NULL);
}
@@ -34,7 +32,6 @@ FlushTask::run()
}
task->run();
task.reset();
- _context->getHandler()->flushDone(_serial);
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h
index fd27538ca4b..c10fb740410 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h
@@ -25,13 +25,10 @@ public:
* @param taskId The identifier used by IFlushStrategy.
* @param engine The running flush engine.
* @param ctx The context of the flush to perform.
- * @param serial The oldest unflushed serial available in the handler once
- * this task has been run.
*/
FlushTask(uint32_t taskId,
FlushEngine &engine,
- const FlushContext::SP &ctx,
- search::SerialNum serial);
+ const FlushContext::SP &ctx);
/**
* Destructor. Notifies the engine that the flush is done to prevent the
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
index 6912cbfde13..2d3eddf7af9 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
@@ -76,9 +76,10 @@ public:
* up to the given serial number can be pruned from the domain of this
* handler. This method is called by an arbitrary worker thread.
*
- * @param oldestSerial The oldest transaction that is still in use.
+ * @param flushedSerial Serial number flushed for all flush
+ * targets belonging to this handler.
*/
- virtual void flushDone(SerialNum oldestSerial) = 0;
+ virtual void flushDone(SerialNum flushedSerial) = 0;
/*
* This method is called to sync tls to stable media, up to and
diff --git a/searchcore/src/vespa/searchcore/proton/server/configstore.h b/searchcore/src/vespa/searchcore/proton/server/configstore.h
index ee6352d61d5..38d0dcfe7bd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/configstore.h
+++ b/searchcore/src/vespa/searchcore/proton/server/configstore.h
@@ -33,6 +33,12 @@ struct ConfigStore : FeedConfigStore {
SerialNum serialNum) = 0;
virtual void removeInvalid() = 0;
+ /**
+ * Perform prune after everything up to and including serialNum has been
+ * flushed to stable storage.
+ *
+ * @param serialNum The serial number flushed to stable storage.
+ */
virtual void prune(SerialNum serialNum) = 0;
virtual SerialNum getBestSerialNum() const = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index eaa166afe9c..fb5e426933a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -648,13 +648,13 @@ DocumentDB::onTransactionLogReplayDone()
void
-DocumentDB::onPerformPrune(SerialNum oldestSerial)
+DocumentDB::onPerformPrune(SerialNum flushedSerial)
{
if (!getAllowPrune()) {
assert(_state.getClosed());
return;
}
- _config_store->prune(oldestSerial);
+ _config_store->prune(flushedSerial);
}
@@ -760,9 +760,9 @@ DocumentDB::getFlushTargets()
}
void
-DocumentDB::flushDone(SerialNum oldestSerial)
+DocumentDB::flushDone(SerialNum flushedSerial)
{
- _feedHandler.flushDone(oldestSerial);
+ _feedHandler.flushDone(flushedSerial);
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index 0c49c2c0170..ebfede59497 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -201,7 +201,7 @@ private:
* Implements FeedHandler::IOwner
*/
virtual void onTransactionLogReplayDone() __attribute__((noinline));
- virtual void onPerformPrune(SerialNum oldestSerial);
+ virtual void onPerformPrune(SerialNum flushedSerial);
virtual bool isFeedBlockedByRejectedConfig();
/**
@@ -391,7 +391,7 @@ public:
getDocsums(const search::engine::DocsumRequest & request);
IFlushTarget::List getFlushTargets();
- void flushDone(SerialNum oldestSerial);
+ void flushDone(SerialNum flushedSerial);
virtual SerialNum
getCurrentSerialNumber() const
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 61e2a83bd37..6b7eadb2cb8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -432,32 +432,32 @@ FeedHandler::performEof()
void
-FeedHandler::performFlushDone(SerialNum oldestSerial)
+FeedHandler::performFlushDone(SerialNum flushedSerial)
{
assert(_writeService.master().isCurrentThread());
- // XXX: oldestSerial can go backwards when attribute vectors are
+ // XXX: flushedSerial can go backwards when attribute vectors are
// resurrected. This can be avoided if resurrected attribute vectors
// pretends to have been flushed at resurrect time.
- if (oldestSerial <= _prunedSerialNum) {
+ if (flushedSerial <= _prunedSerialNum) {
return; // Cannot unprune.
}
if (!_owner.getAllowPrune()) {
- _prunedSerialNum = oldestSerial;
+ _prunedSerialNum = flushedSerial;
_delayedPrune = true;
return;
}
_delayedPrune = false;
- performPrune(oldestSerial);
+ performPrune(flushedSerial);
}
void
-FeedHandler::performPrune(SerialNum oldestSerial)
+FeedHandler::performPrune(SerialNum flushedSerial)
{
try {
- tlsPrune(oldestSerial); // throws on error
- LOG(debug, "Pruned TLS to token %" PRIu64 ".", oldestSerial);
- _owner.onPerformPrune(oldestSerial);
+ tlsPrune(flushedSerial); // throws on error
+ LOG(debug, "Pruned TLS to token %" PRIu64 ".", flushedSerial);
+ _owner.onPerformPrune(flushedSerial);
} catch (const vespalib::IllegalStateException & e) {
LOG(warning, "FeedHandler::performPrune failed due to '%s'.", e.what());
}
@@ -606,7 +606,7 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial,
void
-FeedHandler::flushDone(SerialNum oldestSerial)
+FeedHandler::flushDone(SerialNum flushedSerial)
{
// Called by flush worker thread after performing a flush task
_writeService.master().execute(
@@ -614,7 +614,7 @@ FeedHandler::flushDone(SerialNum oldestSerial)
makeClosure(
this,
&FeedHandler::performFlushDone,
- oldestSerial)));
+ flushedSerial)));
}
void FeedHandler::changeToNormalFeedState(void) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 536013e0d02..a1bf80181db 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -62,7 +62,7 @@ public:
virtual void performWipeHistory() = 0;
virtual void onTransactionLogReplayDone() = 0;
virtual void enterRedoReprocessState() = 0;
- virtual void onPerformPrune(SerialNum oldestSerial) = 0;
+ virtual void onPerformPrune(SerialNum flushedSerial) = 0;
virtual bool isFeedBlockedByRejectedConfig() = 0;
virtual bool getAllowPrune() const = 0;
};
@@ -153,10 +153,10 @@ private:
* Used when flushing is done
*/
void
- performFlushDone(SerialNum oldestSerial);
+ performFlushDone(SerialNum flushedSerial);
void
- performPrune(SerialNum oldestSerial);
+ performPrune(SerialNum flushedSerial);
public:
void
@@ -240,10 +240,10 @@ public:
/**
* Called when a flush is done and allows pruning of the transaction log.
*
- * @param oldestSerial The oldest serial number that is still in use.
+ * @param flushedSerial serial number flushed for all relevant flush targets.
*/
void
- flushDone(SerialNum oldestSerial);
+ flushDone(SerialNum flushedSerial);
/**
* Used to flip between normal and recovery feed states.
diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp
index 7736161ddbc..5599b245655 100644
--- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp
@@ -36,9 +36,9 @@ FlushHandlerProxy::getCurrentSerialNumber(void) const
void
-FlushHandlerProxy::flushDone(SerialNum oldestSerial)
+FlushHandlerProxy::flushDone(SerialNum flushedSerial)
{
- _documentDB->flushDone(oldestSerial);
+ _documentDB->flushDone(flushedSerial);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
index 7e4cd1f3176..9f27c29c2bb 100644
--- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
@@ -28,7 +28,7 @@ public:
getCurrentSerialNumber(void) const;
virtual void
- flushDone(SerialNum oldestSerial);
+ flushDone(SerialNum flushedSerial);
virtual void
syncTls(SerialNum syncTo);
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 60c54bf1ef3..8f08cb233ee 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -300,7 +300,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
vespalib::chdir(protonConfig.basedir);
_tls->start();
_flushEngine.reset(new FlushEngine(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()),
- strategy, flush.maxconcurrent, flush.idleinterval*1000, true));
+ strategy, flush.maxconcurrent, flush.idleinterval*1000));
_fs4Server.reset(new TransportServer(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL));
_fs4Server->setTCPNoDelay(true);
_metricsEngine->addExternalMetrics(_fs4Server->getMetrics());