diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2016-06-17 18:12:01 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-17 18:12:01 +0200 |
commit | 8dd0077ab81f2f0051bfc8169b218c6a0a99133a (patch) | |
tree | 5ff91d21b32f364d455d880bfed7aa528e68609f | |
parent | 35ccd8e0806a645d62dfe5d2c1d78e4354b0898d (diff) | |
parent | a6d0d397b173c13c4c82d373c017770005d4ba60 (diff) |
Merge pull request #37 from yahoo/toregge/fix-flushengine-prune-strategy
Fixup flushengine prune strategy (tls pruning).
18 files changed, 113 insertions, 97 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp index 59b86671a0d..cb195a9cdc6 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp @@ -15,6 +15,7 @@ LOG_SETUP("flushengine_test"); #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/test/insertion_operators.h> #include <memory> // -------------------------------------------------------------------------------- @@ -75,6 +76,7 @@ public: search::SerialNum _oldestSerial; search::SerialNum _currentSerial; vespalib::CountDownLatch _done; + std::vector<search::SerialNum> _flushDoneHistory; public: typedef std::shared_ptr<SimpleHandler> SP; @@ -85,7 +87,8 @@ public: _targets(targets), _oldestSerial(0), _currentSerial(currentSerial), - _done(targets.size()) + _done(targets.size() + 1), + _flushDoneHistory() { // empty } @@ -112,12 +115,15 @@ public: LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", getName().c_str(), oldestSerial); _oldestSerial = std::max(_oldestSerial, oldestSerial); + _flushDoneHistory.push_back(oldestSerial); _done.countDown(); } }; class SimpleTask : public searchcorespi::FlushTask { + search::SerialNum &_flushedSerial; + search::SerialNum &_currentSerial; public: vespalib::Gate &_start; vespalib::Gate &_done; @@ -126,8 +132,11 @@ public: public: SimpleTask(vespalib::Gate &start, vespalib::Gate &done, - vespalib::Gate *proceed) - : _start(start), _done(done), _proceed(proceed) + vespalib::Gate *proceed, + search::SerialNum &flushedSerial, + search::SerialNum ¤tSerial) + : _flushedSerial(flushedSerial), _currentSerial(currentSerial), + _start(start), _done(done), _proceed(proceed) { // empty } @@ -137,6 +146,7 @@ public: if (_proceed != NULL) { _proceed->await(); } + _flushedSerial = _currentSerial; _done.countDown(); } @@ -150,6 +160,7 @@ public: class SimpleTarget : public test::DummyFlushTarget { public: search::SerialNum _flushedSerial; + search::SerialNum _currentSerial; vespalib::Gate _proceed; vespalib::Gate _initDone; vespalib::Gate _taskStart; @@ -162,6 +173,7 @@ public: SimpleTarget(Task::UP task, const std::string &name) : test::DummyFlushTarget(name), _flushedSerial(0), + _currentSerial(0), _proceed(), _initDone(), _taskStart(), @@ -177,7 +189,8 @@ public: _initDone(), _taskStart(), _taskDone(), - _task(new SimpleTask(_taskStart, _taskDone, &_proceed)) + _task(new SimpleTask(_taskStart, _taskDone, &_proceed, + _flushedSerial, _currentSerial)) { if (proceedImmediately) { _proceed.countDown(); @@ -190,8 +203,8 @@ public: virtual SerialNum getFlushedSerialNum() const override { - LOG(info, "SimpleTarget(%s)::getFlushedSerialNum()", - getName().c_str()); + LOG(info, "SimpleTarget(%s)::getFlushedSerialNum() = %" PRIu64, + getName().c_str(), _flushedSerial); return _flushedSerial; } @@ -200,6 +213,7 @@ public: { LOG(info, "SimpleTarget(%s)::initFlush(%" PRIu64 ")", getName().c_str(), currentSerial); + _currentSerial = currentSerial; _initDone.countDown(); return std::move(_task); } @@ -340,7 +354,7 @@ struct Fixture Fixture(uint32_t numThreads, uint32_t idleIntervalMS) : tlsStatsFactory(std::make_shared<SimpleTlsStatsFactory>()), strategy(std::make_shared<SimpleStrategy>()), - engine(tlsStatsFactory, strategy, numThreads, idleIntervalMS, false) + engine(tlsStatsFactory, strategy, numThreads, idleIntervalMS) { } }; @@ -398,7 +412,9 @@ TEST_F("require that oldest serial is found", Fixture(1, IINTERVAL)) f.engine.start(); EXPECT_TRUE(handler->_done.await(LONG_TIMEOUT)); - EXPECT_EQUAL(20ul, handler->_oldestSerial); + EXPECT_EQUAL(25ul, handler->_oldestSerial); + EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }), + handler->_flushDoneHistory); } TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL)) @@ -423,9 +439,13 @@ TEST_F("require that oldest serial is found in group", Fixture(2, IINTERVAL)) f.engine.start(); EXPECT_TRUE(fooH->_done.await(LONG_TIMEOUT)); - EXPECT_EQUAL(20ul, fooH->_oldestSerial); + EXPECT_EQUAL(25ul, fooH->_oldestSerial); + EXPECT_EQUAL(std::vector<search::SerialNum>({ 10, 20, 25 }), + fooH->_flushDoneHistory); EXPECT_TRUE(barH->_done.await(LONG_TIMEOUT)); - EXPECT_EQUAL(15ul, barH->_oldestSerial); + EXPECT_EQUAL(20ul, barH->_oldestSerial); + EXPECT_EQUAL(std::vector<search::SerialNum>({ 5, 15, 20 }), + barH->_flushDoneHistory); } TEST_F("require that target can refuse flush", Fixture(2, IINTERVAL)) diff --git a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp index ac3dbb8fed2..b3fd9a050a8 100644 --- a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp +++ b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp @@ -72,7 +72,7 @@ public: targetType, flushedSerial, approxDiskBytes); - _result.push_back(std::make_shared<FlushContext>(handler, target, 0, 0)); + _result.push_back(std::make_shared<FlushContext>(handler, target, 0)); return *this; } ContextsBuilder &add(const vespalib::string &handlerName, diff --git a/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp b/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp index 2f4083228f9..6c235766de6 100644 --- a/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp +++ b/searchcore/src/tests/proton/server/memoryflush/memoryflush_test.cpp @@ -105,7 +105,7 @@ public: return *this; } ContextBuilder &add(const IFlushTarget::SP &target, SerialNum lastSerial = 0) { - FlushContext::SP ctx(new FlushContext(_handler, target, 0, lastSerial)); + FlushContext::SP ctx(new FlushContext(_handler, target, lastSerial)); return add(ctx); } const FlushContext::List &list() const { return _list; } @@ -282,22 +282,22 @@ requireThatWeCanOrderByTlsSize() (handler1, createTargetT("t2", TimeStamp(now.val() - 10 * TimeStamp::SEC), 1900), - 2000, 2000)). + 2000)). add(std::make_shared<FlushContext> (handler2, createTargetT("t1", TimeStamp(now.val() - 5 * TimeStamp::SEC), 1000), - 2000, 2000)). + 2000)). add(std::make_shared<FlushContext> (handler1, createTargetT("t4", TimeStamp(), 1000), - 2000, 2000)). + 2000)). add(std::make_shared<FlushContext> (handler2, createTargetT("t3", TimeStamp(now.val() - 15 * TimeStamp::SEC), 1900), - 2000, 2000)); + 2000)); { // sum of tls sizes above limit, trigger sort order based on tls size MemoryFlush flush({1000, 3 * gibi, 1.0, 1000, 1.0, 2000, TimeStamp(2 * TimeStamp::SEC)}, start); EXPECT_TRUE(assertOrder(StringList().add("t4").add("t1").add("t2").add("t3"), diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp index 7f436a8d594..1624bc3b51c 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp @@ -10,13 +10,11 @@ namespace proton { FlushContext::FlushContext( const IFlushHandler::SP &handler, const IFlushTarget::SP &target, - search::SerialNum oldestFlushable, search::SerialNum lastSerial) : _name(createName(*handler, *target)), _handler(handler), _target(target), _task(), - _oldestFlushable(oldestFlushable), _lastSerial(lastSerial) { // empty diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h index 9f2b557b3a9..837a99f153e 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h @@ -19,7 +19,6 @@ private: IFlushHandler::SP _handler; IFlushTarget::SP _target; searchcorespi::FlushTask::UP _task; - search::SerialNum _oldestFlushable; search::SerialNum _lastSerial; public: @@ -43,7 +42,6 @@ public: */ FlushContext(const IFlushHandler::SP &handler, const IFlushTarget::SP &target, - search::SerialNum oldestFlushable, search::SerialNum lastSerial); /** @@ -83,13 +81,6 @@ public: const IFlushTarget::SP & getTarget() const { return _target; } /** - * Returns the oldest flushable serial number. - * - * @return The oldest flushable serial number - */ - search::SerialNum getOldestFlushable() const { return _oldestFlushable; } - - /** * Returns the last serial number. * * @return The last serial number diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index a948933ca81..f3013b4e5de 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -22,16 +22,13 @@ namespace { search::SerialNum findOldestFlushedSerial(const IFlushTarget::List &lst, - const IFlushHandler &handler, - const IFlushTarget *self) + const IFlushHandler &handler) { search::SerialNum ret(handler.getCurrentSerialNumber()); for (const IFlushTarget::SP & target : lst) { - if (self != target.get()) { - ret = std::min(ret, target->getFlushedSerialNum()); - } + ret = std::min(ret, target->getFlushedSerialNum()); } - LOG(debug, "Oldest flushed serial for '%s' will be %" PRIu64 " after flush.", handler.getName().c_str(), ret); + LOG(debug, "Oldest flushed serial for '%s' is %" PRIu64 ".", handler.getName().c_str(), ret); return ret; } @@ -55,11 +52,10 @@ FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory, IFlushStrategy::SP strategy, uint32_t numThreads, - uint32_t idleIntervalMS, bool enableAutoPrune) + uint32_t idleIntervalMS) : _closed(false), _maxConcurrent(numThreads), _idleIntervalMS(idleIntervalMS), - _enableAutoPrune(enableAutoPrune), _taskId(0), _threadPool(128 * 1024), _strategy(strategy), @@ -70,7 +66,8 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> _flushing(), _strategyLock(), _strategyMonitor(), - _tlsStatsFactory(tlsStatsFactory) + _tlsStatsFactory(tlsStatsFactory), + _pendingPrune() { // empty } @@ -128,10 +125,10 @@ bool FlushEngine::wait(size_t minimumWaitTimeIfReady) { MonitorGuard guard(_monitor); - if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard)) { + if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard) && _pendingPrune.empty()) { guard.wait(minimumWaitTimeIfReady); } - while ( ! canFlushMore(guard) ) { + while ( ! canFlushMore(guard) && _pendingPrune.empty()) { guard.wait(1000); // broadcast when flush done } return !_closed; @@ -146,6 +143,9 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg) vespalib::string prevFlushName; while (wait(shouldIdle ? _idleIntervalMS : 0)) { shouldIdle = false; + if (prune()) { + continue; // Prune attempted on one or more handlers + } prevFlushName = flushNextTarget(prevFlushName); if ( ! prevFlushName.empty()) { // Sleep at least 10 ms after a successful flush in order to avoid busy loop in case @@ -154,23 +154,26 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg) } else { shouldIdle = true; } - if (_enableAutoPrune) { - prune(); - } LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str()); } } -void FlushEngine::prune() +bool +FlushEngine::prune() { - if (_flushing.empty()) { + std::set<IFlushHandler::SP> toPrune; + { MonitorGuard guard(_monitor); - for (const auto & it : _handlers) { - IFlushHandler & handler(*it.second); - IFlushTarget::List lst = handler.getFlushTargets(); - handler.flushDone(findOldestFlushedSerial(lst, handler, NULL)); + if (_pendingPrune.empty()) { + return false; } + _pendingPrune.swap(toPrune); } + for (const auto &handler : toPrune) { + IFlushTarget::List lst = handler->getFlushTargets(); + handler->flushDone(findOldestFlushedSerial(lst, *handler)); + } + return true; } bool FlushEngine::isFlushing(const MonitorGuard & guard, const vespalib::string & name) const @@ -201,7 +204,6 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const if (!isFlushing(guard, FlushContext::createName(handler, *target)) || includeFlushingTargets) { ret.push_back(FlushContext::SP(new FlushContext(it.second, IFlushTarget::SP(new CachedFlushTarget(target)), - findOldestFlushedSerial(lst, handler, target.get()), serial))); } else { LOG(debug, "Target '%s' with flushedSerialNum = %ld already has a flush going. Local last serial = %ld.", @@ -263,7 +265,7 @@ FlushEngine::flushAll(const FlushContext::List &lst) ctx->getName().c_str(), ctx->getTarget()->getFlushedSerialNum() + 1, ctx->getHandler()->getCurrentSerialNumber()); - _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx, ctx->getOldestFlushable()))); + _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx))); } else { LOG(debug, "Target '%s' failed to initiate flush of transactions %" PRIu64 " through %" PRIu64 ".", ctx->getName().c_str(), @@ -303,7 +305,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name) name.c_str(), lst.first.size()); FastOS_Thread::Sleep(1000); } - _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx, ctx->getOldestFlushable()))); + _executor.execute(Task::UP(new FlushTask(initFlush(*ctx), *this, ctx))); return ctx->getName(); } @@ -340,6 +342,8 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec()); MonitorGuard guard(_monitor); _flushing.erase(taskId); + assert(ctx.getHandler()); + _pendingPrune.insert(ctx.getHandler()); guard.broadcast(); } @@ -348,7 +352,12 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler) { MonitorGuard guard(_monitor); - return _handlers.putHandler(docTypeName, flushHandler); + IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler)); + if (result) { + _pendingPrune.erase(result); + } + _pendingPrune.insert(flushHandler); + return std::move(result); } IFlushHandler::SP @@ -362,7 +371,9 @@ IFlushHandler::SP FlushEngine::removeFlushHandler(const DocTypeName &docTypeName) { MonitorGuard guard(_monitor); - return _handlers.removeHandler(docTypeName); + IFlushHandler::SP result(_handlers.removeHandler(docTypeName)); + _pendingPrune.erase(result); + return std::move(result); } FlushEngine::FlushMetaSet diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 215a8ac8de5..d8d77c11b8f 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -49,7 +49,6 @@ private: bool _closed; const uint32_t _maxConcurrent; const uint32_t _idleIntervalMS; - const bool _enableAutoPrune; uint32_t _taskId; FastOS_ThreadPool _threadPool; IFlushStrategy::SP _strategy; @@ -61,13 +60,14 @@ private: vespalib::Lock _strategyLock; // serialize setStrategy calls vespalib::Monitor _strategyMonitor; std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory; + std::set<IFlushHandler::SP> _pendingPrune; FlushContext::List getTargetList(bool includeFlushingTargets) const; std::pair<FlushContext::List,bool> getSortedTargetList(vespalib::MonitorGuard &strategyGuard) const; FlushContext::SP initNextFlush(const FlushContext::List &lst); vespalib::string flushNextTarget(const vespalib::string & name); void flushAll(const FlushContext::List &lst); - void prune(); + bool prune(); uint32_t initFlush(const FlushContext &ctx); uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target); void flushDone(const FlushContext &ctx, uint32_t taskId); @@ -93,15 +93,10 @@ public: * @param strategy The flushing strategy to use. * @param numThreads The number of worker threads to use. * @param idleInterval The interval between when flushes are checked whne there are no one progressing. - * @param enableAutoPrune Indicate if pruning shall be done even if there - are no flushing happening. Turn off for some tests. - Needed for pruning to be correct if one flush is started - while another is in progress. In that case the pruning - will be too conservative. */ FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStatsFactory, - IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS, bool enableAutoPrune); + IFlushStrategy::SP strategy, uint32_t numThreads, uint32_t idleIntervalMS); /** * Destructor. Waits for all pending tasks to complete. diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp index fc692979753..8af59d3079f 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.cpp @@ -9,12 +9,10 @@ namespace proton { FlushTask::FlushTask(uint32_t taskId, FlushEngine &engine, - const FlushContext::SP &ctx, - search::SerialNum serial) + const FlushContext::SP &ctx) : _taskId(taskId), _engine(engine), - _context(ctx), - _serial(serial) + _context(ctx) { LOG_ASSERT(_context.get() != NULL); } @@ -34,7 +32,6 @@ FlushTask::run() } task->run(); task.reset(); - _context->getHandler()->flushDone(_serial); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h index fd27538ca4b..c10fb740410 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtask.h @@ -25,13 +25,10 @@ public: * @param taskId The identifier used by IFlushStrategy. * @param engine The running flush engine. * @param ctx The context of the flush to perform. - * @param serial The oldest unflushed serial available in the handler once - * this task has been run. */ FlushTask(uint32_t taskId, FlushEngine &engine, - const FlushContext::SP &ctx, - search::SerialNum serial); + const FlushContext::SP &ctx); /** * Destructor. Notifies the engine that the flush is done to prevent the diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h index 6912cbfde13..2d3eddf7af9 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h @@ -76,9 +76,10 @@ public: * up to the given serial number can be pruned from the domain of this * handler. This method is called by an arbitrary worker thread. * - * @param oldestSerial The oldest transaction that is still in use. + * @param flushedSerial Serial number flushed for all flush + * targets belonging to this handler. */ - virtual void flushDone(SerialNum oldestSerial) = 0; + virtual void flushDone(SerialNum flushedSerial) = 0; /* * This method is called to sync tls to stable media, up to and diff --git a/searchcore/src/vespa/searchcore/proton/server/configstore.h b/searchcore/src/vespa/searchcore/proton/server/configstore.h index ee6352d61d5..38d0dcfe7bd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/configstore.h +++ b/searchcore/src/vespa/searchcore/proton/server/configstore.h @@ -33,6 +33,12 @@ struct ConfigStore : FeedConfigStore { SerialNum serialNum) = 0; virtual void removeInvalid() = 0; + /** + * Perform prune after everything up to and including serialNum has been + * flushed to stable storage. + * + * @param serialNum The serial number flushed to stable storage. + */ virtual void prune(SerialNum serialNum) = 0; virtual SerialNum getBestSerialNum() const = 0; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index eaa166afe9c..fb5e426933a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -648,13 +648,13 @@ DocumentDB::onTransactionLogReplayDone() void -DocumentDB::onPerformPrune(SerialNum oldestSerial) +DocumentDB::onPerformPrune(SerialNum flushedSerial) { if (!getAllowPrune()) { assert(_state.getClosed()); return; } - _config_store->prune(oldestSerial); + _config_store->prune(flushedSerial); } @@ -760,9 +760,9 @@ DocumentDB::getFlushTargets() } void -DocumentDB::flushDone(SerialNum oldestSerial) +DocumentDB::flushDone(SerialNum flushedSerial) { - _feedHandler.flushDone(oldestSerial); + _feedHandler.flushDone(flushedSerial); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 0c49c2c0170..ebfede59497 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -201,7 +201,7 @@ private: * Implements FeedHandler::IOwner */ virtual void onTransactionLogReplayDone() __attribute__((noinline)); - virtual void onPerformPrune(SerialNum oldestSerial); + virtual void onPerformPrune(SerialNum flushedSerial); virtual bool isFeedBlockedByRejectedConfig(); /** @@ -391,7 +391,7 @@ public: getDocsums(const search::engine::DocsumRequest & request); IFlushTarget::List getFlushTargets(); - void flushDone(SerialNum oldestSerial); + void flushDone(SerialNum flushedSerial); virtual SerialNum getCurrentSerialNumber() const diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 61e2a83bd37..6b7eadb2cb8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -432,32 +432,32 @@ FeedHandler::performEof() void -FeedHandler::performFlushDone(SerialNum oldestSerial) +FeedHandler::performFlushDone(SerialNum flushedSerial) { assert(_writeService.master().isCurrentThread()); - // XXX: oldestSerial can go backwards when attribute vectors are + // XXX: flushedSerial can go backwards when attribute vectors are // resurrected. This can be avoided if resurrected attribute vectors // pretends to have been flushed at resurrect time. - if (oldestSerial <= _prunedSerialNum) { + if (flushedSerial <= _prunedSerialNum) { return; // Cannot unprune. } if (!_owner.getAllowPrune()) { - _prunedSerialNum = oldestSerial; + _prunedSerialNum = flushedSerial; _delayedPrune = true; return; } _delayedPrune = false; - performPrune(oldestSerial); + performPrune(flushedSerial); } void -FeedHandler::performPrune(SerialNum oldestSerial) +FeedHandler::performPrune(SerialNum flushedSerial) { try { - tlsPrune(oldestSerial); // throws on error - LOG(debug, "Pruned TLS to token %" PRIu64 ".", oldestSerial); - _owner.onPerformPrune(oldestSerial); + tlsPrune(flushedSerial); // throws on error + LOG(debug, "Pruned TLS to token %" PRIu64 ".", flushedSerial); + _owner.onPerformPrune(flushedSerial); } catch (const vespalib::IllegalStateException & e) { LOG(warning, "FeedHandler::performPrune failed due to '%s'.", e.what()); } @@ -606,7 +606,7 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, void -FeedHandler::flushDone(SerialNum oldestSerial) +FeedHandler::flushDone(SerialNum flushedSerial) { // Called by flush worker thread after performing a flush task _writeService.master().execute( @@ -614,7 +614,7 @@ FeedHandler::flushDone(SerialNum oldestSerial) makeClosure( this, &FeedHandler::performFlushDone, - oldestSerial))); + flushedSerial))); } void FeedHandler::changeToNormalFeedState(void) { diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 536013e0d02..a1bf80181db 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -62,7 +62,7 @@ public: virtual void performWipeHistory() = 0; virtual void onTransactionLogReplayDone() = 0; virtual void enterRedoReprocessState() = 0; - virtual void onPerformPrune(SerialNum oldestSerial) = 0; + virtual void onPerformPrune(SerialNum flushedSerial) = 0; virtual bool isFeedBlockedByRejectedConfig() = 0; virtual bool getAllowPrune() const = 0; }; @@ -153,10 +153,10 @@ private: * Used when flushing is done */ void - performFlushDone(SerialNum oldestSerial); + performFlushDone(SerialNum flushedSerial); void - performPrune(SerialNum oldestSerial); + performPrune(SerialNum flushedSerial); public: void @@ -240,10 +240,10 @@ public: /** * Called when a flush is done and allows pruning of the transaction log. * - * @param oldestSerial The oldest serial number that is still in use. + * @param flushedSerial serial number flushed for all relevant flush targets. */ void - flushDone(SerialNum oldestSerial); + flushDone(SerialNum flushedSerial); /** * Used to flip between normal and recovery feed states. diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp index 7736161ddbc..5599b245655 100644 --- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp @@ -36,9 +36,9 @@ FlushHandlerProxy::getCurrentSerialNumber(void) const void -FlushHandlerProxy::flushDone(SerialNum oldestSerial) +FlushHandlerProxy::flushDone(SerialNum flushedSerial) { - _documentDB->flushDone(oldestSerial); + _documentDB->flushDone(flushedSerial); } diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h index 7e4cd1f3176..9f27c29c2bb 100644 --- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h @@ -28,7 +28,7 @@ public: getCurrentSerialNumber(void) const; virtual void - flushDone(SerialNum oldestSerial); + flushDone(SerialNum flushedSerial); virtual void syncTls(SerialNum syncTo); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 60c54bf1ef3..8f08cb233ee 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -300,7 +300,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) vespalib::chdir(protonConfig.basedir); _tls->start(); _flushEngine.reset(new FlushEngine(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()), - strategy, flush.maxconcurrent, flush.idleinterval*1000, true)); + strategy, flush.maxconcurrent, flush.idleinterval*1000)); _fs4Server.reset(new TransportServer(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL)); _fs4Server->setTCPNoDelay(true); _metricsEngine->addExternalMetrics(_fs4Server->getMetrics()); |