diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-05 15:53:03 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-05 15:53:03 +0200 |
commit | f32aded80e93429a376a7c671a8c22ffa2c11ded (patch) | |
tree | e45545a94a34a6de1d8f81ffa43db2e0671809d1 /searchcore | |
parent | f1ad2f1270209e107f931e111f6030dd5629d93d (diff) |
Revert "Revert "Balder/refactor flushengine runloop""
Diffstat (limited to 'searchcore')
29 files changed, 158 insertions, 162 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index a7efe2952eb..3fdc5a8ce9f 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -450,6 +450,17 @@ struct Fixture } }; +TEST("require that leaf defaults are sane") { + test::DummyFlushTarget leaf("dummy"); + EXPECT_FALSE(leaf.needUrgentFlush()); + EXPECT_EQUAL(0.0, leaf.get_replay_operation_cost()); + EXPECT_TRUE(IFlushTarget::Priority::NORMAL == leaf.getPriority()); + EXPECT_TRUE(50 == static_cast<int>(IFlushTarget::Priority::NORMAL)); + EXPECT_TRUE(100 == static_cast<int>(IFlushTarget::Priority::HIGH)); + EXPECT_TRUE(IFlushTarget::Priority::NORMAL < IFlushTarget::Priority::HIGH); + EXPECT_TRUE(IFlushTarget::Priority::HIGH > IFlushTarget::Priority::NORMAL); +} + TEST_F("require that strategy controls flush target", Fixture(1, IINTERVAL)) { vespalib::Gate fooG, barG; diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 880bf8aa3e0..2f6ebcd967f 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -344,6 +344,10 @@ TEST_F(IndexManagerTest, require_that_large_memory_footprint_triggers_urgent_flu EXPECT_TRUE(IndexFlushTarget(_index_manager->getMaintainer(), FlushStats(17_Gi)).needUrgentFlush()); } +TEST_F(IndexManagerTest, require_that_flush_priority_is_high) { + EXPECT_EQ(IFlushTarget::Priority::HIGH, IndexFlushTarget(_index_manager->getMaintainer()).getPriority()); +} + TEST_F(IndexManagerTest, require_that_multiple_flushes_gives_multiple_indexes) { size_t flush_count = 10; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp index 8101b29d98c..401cc42de52 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp @@ -155,7 +155,7 @@ FlushableAttribute::FlushableAttribute(AttributeVectorSP attr, vespalib::ISequencedTaskExecutor & attributeFieldWriter, const HwInfo &hwInfo) - : IFlushTarget(make_string("attribute.flush.%s", attr->getName().c_str()), Type::SYNC, Component::ATTRIBUTE), + : LeafFlushTarget(make_string("attribute.flush.%s", attr->getName().c_str()), Type::SYNC, Component::ATTRIBUTE), _attr(attr), _cleanUpAfterFlush(true), _lastStats(), diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h index 2ae63bef4db..56dd0e0dfec 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h @@ -19,7 +19,7 @@ class TransientResourceUsage; /** * Implementation of IFlushTarget interface for attribute vectors. */ -class FlushableAttribute : public searchcorespi::IFlushTarget +class FlushableAttribute : public searchcorespi::LeafFlushTarget { private: /** @@ -29,15 +29,15 @@ private: using AttributeVectorSP = std::shared_ptr<search::AttributeVector>; using FlushStats = searchcorespi::FlushStats; - AttributeVectorSP _attr; - bool _cleanUpAfterFlush; - FlushStats _lastStats; - const search::TuneFileAttributes _tuneFileAttributes; + AttributeVectorSP _attr; + bool _cleanUpAfterFlush; + FlushStats _lastStats; + const search::TuneFileAttributes _tuneFileAttributes; const search::common::FileHeaderContext &_fileHeaderContext; - vespalib::ISequencedTaskExecutor &_attributeFieldWriter; - HwInfo _hwInfo; - std::shared_ptr<AttributeDirectory> _attrDir; - double _replay_operation_cost; + vespalib::ISequencedTaskExecutor &_attributeFieldWriter; + HwInfo _hwInfo; + std::shared_ptr<AttributeDirectory> _attrDir; + double _replay_operation_cost; Task::UP internalInitFlush(SerialNum currentSerial); diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp index 06bf8d0a8a6..57435c91b5f 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp @@ -66,7 +66,7 @@ private: } SummaryGCTarget::SummaryGCTarget(const vespalib::string & name, vespalib::Executor & summaryService, IDocumentStore & docStore) - : IFlushTarget(name, Type::GC, Component::DOCUMENT_STORE), + : LeafFlushTarget(name, Type::GC, Component::DOCUMENT_STORE), _summaryService(summaryService), _docStore(docStore), _lastStats() diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h index 083f763d8e6..56e17d76210 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h @@ -12,7 +12,7 @@ namespace proton { /** * This class implements the IFlushTarget interface to proxy a summary manager. */ -class SummaryGCTarget : public searchcorespi::IFlushTarget { +class SummaryGCTarget : public searchcorespi::LeafFlushTarget { public: using FlushStats = searchcorespi::FlushStats; using IDocumentStore = search::IDocumentStore; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp index 45fc23175bf..5cb60c907f1 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp @@ -43,7 +43,7 @@ public: SummaryFlushTarget::SummaryFlushTarget(IDocumentStore & docStore, vespalib::Executor & summaryService) - : IFlushTarget("summary.flush", Type::SYNC, Component::DOCUMENT_STORE), + : LeafFlushTarget("summary.flush", Type::SYNC, Component::DOCUMENT_STORE), _docStore(docStore), _summaryService(summaryService), _lastStats() diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h index f864b922af8..1ac7b6c9c0e 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h @@ -9,7 +9,7 @@ namespace proton { /** * This class implements the IFlushTarget interface to proxy a summary manager. */ -class SummaryFlushTarget : public searchcorespi::IFlushTarget { +class SummaryFlushTarget : public searchcorespi::LeafFlushTarget { private: using FlushStats = searchcorespi::FlushStats; search::IDocumentStore & _docStore; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp index 543af05db45..d1e39628c83 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp @@ -42,7 +42,7 @@ namespace proton { namespace { -class ShrinkSummaryLidSpaceFlushTarget : public ShrinkLidSpaceFlushTarget +class ShrinkSummaryLidSpaceFlushTarget : public ShrinkLidSpaceFlushTarget { using ICompactableLidSpace = search::common::ICompactableLidSpace; vespalib::Executor & _summaryService; diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp index a712035e9af..db3d8adfe26 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp @@ -150,7 +150,7 @@ DocumentMetaStoreFlushTarget:: DocumentMetaStoreFlushTarget(const DocumentMetaStore::SP dms, ITlsSyncer &tlsSyncer, const vespalib::string & baseDir, const TuneFileAttributes &tuneFileAttributes, const FileHeaderContext &fileHeaderContext, const HwInfo &hwInfo) - : IFlushTarget("documentmetastore.flush", Type::SYNC, Component::ATTRIBUTE), + : LeafFlushTarget("documentmetastore.flush", Type::SYNC, Component::ATTRIBUTE), _dms(dms), _tlsSyncer(tlsSyncer), _baseDir(baseDir), diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h index 777d7de1e17..5a3a671553c 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h @@ -19,7 +19,7 @@ class TransientResourceUsage; /** * Implementation of IFlushTarget interface for document meta store. **/ -class DocumentMetaStoreFlushTarget : public searchcorespi::IFlushTarget +class DocumentMetaStoreFlushTarget : public searchcorespi::LeafFlushTarget { private: /** @@ -29,16 +29,16 @@ private: using DocumentMetaStoreSP = std::shared_ptr<DocumentMetaStore>; using FlushStats = searchcorespi::FlushStats; - DocumentMetaStoreSP _dms; - ITlsSyncer &_tlsSyncer; - vespalib::string _baseDir; - bool _cleanUpAfterFlush; - FlushStats _lastStats; - const search::TuneFileAttributes _tuneFileAttributes; + DocumentMetaStoreSP _dms; + ITlsSyncer &_tlsSyncer; + vespalib::string _baseDir; + bool _cleanUpAfterFlush; + FlushStats _lastStats; + const search::TuneFileAttributes _tuneFileAttributes; const search::common::FileHeaderContext &_fileHeaderContext; - HwInfo _hwInfo; - std::shared_ptr<AttributeDiskLayout> _diskLayout; - std::shared_ptr<AttributeDirectory> _dmsDir; + HwInfo _hwInfo; + std::shared_ptr<AttributeDiskLayout> _diskLayout; + std::shared_ptr<AttributeDirectory> _dmsDir; public: using SP = std::shared_ptr<DocumentMetaStoreFlushTarget>; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp index 3947ac1d07d..9f212006234 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp @@ -11,18 +11,10 @@ CachedFlushTarget::CachedFlushTarget(const IFlushTarget::SP &target) _lastFlushTime(target->getLastFlushTime()), _memoryGain(target->getApproxMemoryGain()), _diskGain(target->getApproxDiskGain()), + _approxBytesToWriteToDisk(target->getApproxBytesToWriteToDisk()), + _replay_operation_cost(target->get_replay_operation_cost()), _needUrgentFlush(target->needUrgentFlush()), - _approxBytesToWriteToDisk(target->getApproxBytesToWriteToDisk()) -{ - // empty -} - - -uint64_t -CachedFlushTarget::getApproxBytesToWriteToDisk() const -{ - return _approxBytesToWriteToDisk; -} - + _priority(target->getPriority()) +{ } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h index 1bd51e9ab11..a43c7c16217 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h @@ -19,8 +19,10 @@ private: Time _lastFlushTime; MemoryGain _memoryGain; DiskGain _diskGain; - bool _needUrgentFlush; uint64_t _approxBytesToWriteToDisk; + double _replay_operation_cost; + bool _needUrgentFlush; + Priority _priority; public: /** @@ -47,13 +49,15 @@ public: SerialNum getFlushedSerialNum() const override { return _flushedSerialNum; } Time getLastFlushTime() const override { return _lastFlushTime; } bool needUrgentFlush() const override { return _needUrgentFlush; } + Priority getPriority() const override { return _priority; } + double get_replay_operation_cost() const override { return _replay_operation_cost; } Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override { return _target->initFlush(currentSerial, std::move(flush_token)); } FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } - uint64_t getApproxBytesToWriteToDisk() const override; + uint64_t getApproxBytesToWriteToDisk() const override { return _approxBytesToWriteToDisk; } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 3e577bf6cbe..772216e7f37 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -128,8 +128,7 @@ FlushEngine::close() if (_thread.joinable()) { _thread.join(); } - _executor.shutdown(); - _executor.sync(); + _executor.shutdown().sync(); return *this; } @@ -158,36 +157,61 @@ bool FlushEngine::wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune) { std::unique_lock<std::mutex> guard(_lock); - if ( (minimumWaitTimeIfReady != vespalib::duration::zero()) && canFlushMore(guard) && _pendingPrune.empty()) { + if (canFlushMore(guard) && _pendingPrune.empty()) { _cond.wait_for(guard, minimumWaitTimeIfReady); } while ( ! canFlushMore(guard) && ( ignorePendingPrune || _pendingPrune.empty())) { _cond.wait_for(guard, 1s); // broadcast when flush done } - return !_closed; + return !_closed.load(std::memory_order_relaxed); +} + +void +FlushEngine::wait_for_slot(IFlushTarget::Priority priority) +{ + (void) priority; + std::unique_lock<std::mutex> guard(_lock); + while ( ! canFlushMore(guard) && _pendingPrune.empty()) { + _cond.wait_for(guard, 1s); // broadcast when flush done + } +} + +std::pair<bool, vespalib::string> +FlushEngine::checkAndFlush(vespalib::string prev) { + std::pair<FlushContext::List, bool> lst = getSortedTargetList(); + if (lst.second) { + // Everything returned from a priority strategy should be flushed + flushAll(lst.first); + } else if ( ! lst.first.empty()) { + wait_for_slot(lst.first[0]->getTarget()->getPriority()); + prev = flushNextTarget(prev, lst.first); + if (!prev.empty()) { + // Sleep 1 ms after a successful flush in order to avoid busy loop in case + // of strategy or target error. + std::this_thread::sleep_for(1ms); + return {false, prev}; + } + } + return {true, ""}; } void FlushEngine::run() { _has_thread = true; - bool shouldIdle = false; vespalib::string prevFlushName; - while (wait(shouldIdle ? _idleInterval : vespalib::duration::zero(), false)) { - shouldIdle = false; + for (vespalib::duration idleInterval=vespalib::duration::zero(); !_closed.load(std::memory_order_relaxed); idleInterval = _idleInterval) { + LOG(debug, "Making another check for something to flush, last was '%s'", prevFlushName.c_str()); if (prune()) { - continue; // Prune attempted on one or more handlers - } - prevFlushName = flushNextTarget(prevFlushName); - if ( ! prevFlushName.empty()) { - // Sleep 1 ms after a successful flush in order to avoid busy loop in case - // of strategy or target error. - std::this_thread::sleep_for(1ms); + // Prune attempted on one or more handlers + wait_for_slot(IFlushTarget::Priority::NORMAL); } else { - shouldIdle = true; + auto [needWait, name] = checkAndFlush(prevFlushName); + prevFlushName = name; + if (needWait) { + wait(idleInterval); + } } - LOG(debug, "Making another wait(idle=%s, timeS=%1.3f) last was '%s'", - shouldIdle ? "true" : "false", shouldIdle ? vespalib::to_s(_idleInterval) : 0, prevFlushName.c_str()); } _executor.sync(); prune(); @@ -337,27 +361,21 @@ FlushEngine::flushAll(const FlushContext::List &lst) } } } + _executor.sync(); + prune(); + std::lock_guard<std::mutex> strategyGuard(_strategyLock); + _priorityStrategy.reset(); + _strategyCond.notify_all(); } vespalib::string -FlushEngine::flushNextTarget(const vespalib::string & name) +FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts) { - std::pair<FlushContext::List,bool> lst = getSortedTargetList(); - if (lst.second) { - // Everything returned from a priority strategy should be flushed - flushAll(lst.first); - _executor.sync(); - prune(); - std::lock_guard<std::mutex> strategyGuard(_strategyLock); - _priorityStrategy.reset(); - _strategyCond.notify_all(); - return ""; - } - if (lst.first.empty()) { + if (contexts.empty()) { LOG(debug, "No target to flush."); return ""; } - FlushContext::SP ctx = initNextFlush(lst.first); + FlushContext::SP ctx = initNextFlush(contexts); if ( ! ctx) { LOG(debug, "All targets refused to flush."); return ""; @@ -365,7 +383,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name) if ( name == ctx->getName()) { LOG(info, "The same target %s out of %ld has been asked to flush again. " "This might indicate flush logic flaw so I will wait 100 ms before doing it.", - name.c_str(), lst.first.size()); + name.c_str(), contexts.size()); std::this_thread::sleep_for(100ms); } _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); @@ -445,7 +463,6 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP { std::lock_guard<std::mutex> guard(_lock); taskId = _taskId++; - vespalib::string name(FlushContext::createName(*handler, *target)); FlushInfo flush(taskId, handler->getName(), target); _flushing[taskId] = flush; } @@ -459,7 +476,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy) { std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock); std::unique_lock<std::mutex> strategyGuard(_strategyLock); - if (_closed) { + if (_closed.load(std::memory_order_relaxed)) { return; } assert(!_priorityStrategy); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 1d6ed763ff6..06ce34bb831 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -50,7 +50,7 @@ private: }; using FlushMap = std::map<uint32_t, FlushInfo>; using FlushHandlerMap = HandlerMap<IFlushHandler>; - bool _closed; + std::atomic<bool> _closed; const uint32_t _maxConcurrent; const vespalib::duration _idleInterval; uint32_t _taskId; @@ -75,15 +75,20 @@ private: std::pair<FlushContext::List,bool> getSortedTargetList(); std::shared_ptr<search::IFlushToken> get_flush_token(const FlushContext& ctx); FlushContext::SP initNextFlush(const FlushContext::List &lst); - vespalib::string flushNextTarget(const vespalib::string & name); + vespalib::string flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts); void flushAll(const FlushContext::List &lst); bool prune(); uint32_t initFlush(const FlushContext &ctx); uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target); void flushDone(const FlushContext &ctx, uint32_t taskId); bool canFlushMore(const std::unique_lock<std::mutex> &guard) const; + void wait_for_slot(IFlushTarget::Priority priority); bool wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune); + void wait(vespalib::duration minimumWaitTimeIfReady) { + wait(minimumWaitTimeIfReady, false); + } bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const; + std::pair<bool, vespalib::string> checkAndFlush(vespalib::string prev); friend class FlushTask; friend class FlushEngineExplorer; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp index eb35e2b2eb1..b62a9191625 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp @@ -22,61 +22,10 @@ FlushTargetProxy::FlushTargetProxy(const IFlushTarget::SP &target, { } - -IFlushTarget::MemoryGain -FlushTargetProxy::getApproxMemoryGain() const -{ - return _target->getApproxMemoryGain(); -} - - -IFlushTarget::DiskGain -FlushTargetProxy::getApproxDiskGain() const -{ - return _target->getApproxDiskGain(); -} - - -IFlushTarget::SerialNum -FlushTargetProxy::getFlushedSerialNum() const -{ - return _target->getFlushedSerialNum(); -} - - -IFlushTarget::Time -FlushTargetProxy::getLastFlushTime() const -{ - return _target->getLastFlushTime(); -} - - -bool -FlushTargetProxy::needUrgentFlush() const -{ - return _target->needUrgentFlush(); -} - - IFlushTarget::Task::UP FlushTargetProxy::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) { return _target->initFlush(currentSerial, std::move(flush_token)); } - -FlushStats -FlushTargetProxy::getLastFlushStats() const -{ - return _target->getLastFlushStats(); -} - - -uint64_t -FlushTargetProxy::getApproxBytesToWriteToDisk() const -{ - return _target->getApproxBytesToWriteToDisk(); -} - - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h index 0967ca1d6c1..e2ac832e023 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h @@ -41,14 +41,16 @@ public: */ const IFlushTarget::SP & getFlushTarget() const { return _target; } // Implements IFlushTarget. - MemoryGain getApproxMemoryGain() const override; - DiskGain getApproxDiskGain() const override; - SerialNum getFlushedSerialNum() const override; - Time getLastFlushTime() const override; - bool needUrgentFlush() const override; + MemoryGain getApproxMemoryGain() const override { return _target->getApproxMemoryGain(); } + DiskGain getApproxDiskGain() const override { return _target->getApproxDiskGain(); } + SerialNum getFlushedSerialNum() const override { return _target->getFlushedSerialNum(); } + Time getLastFlushTime() const override { return _target->getLastFlushTime(); } + bool needUrgentFlush() const override { return _target->needUrgentFlush(); } + Priority getPriority() const override { return _target->getPriority(); } + uint64_t getApproxBytesToWriteToDisk() const override { return _target->getApproxBytesToWriteToDisk(); } + searchcorespi::FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } + double get_replay_operation_cost() const override { return _target->get_replay_operation_cost(); } Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; - searchcorespi::FlushStats getLastFlushStats() const override; - uint64_t getApproxBytesToWriteToDisk() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp index b824f2a7c88..d292f8347a7 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp @@ -6,6 +6,7 @@ namespace proton { using searchcorespi::IFlushTarget; +using searchcorespi::LeafFlushTarget; using searchcorespi::FlushStats; using searchcorespi::FlushTask; @@ -46,7 +47,7 @@ ShrinkLidSpaceFlushTarget::ShrinkLidSpaceFlushTarget(const vespalib::string &nam SerialNum flushedSerialNum, Time lastFlushTime, std::shared_ptr<ICompactableLidSpace> target) - : IFlushTarget(name, type, component), + : LeafFlushTarget(name, type, component), _target(std::move(target)), _flushedSerialNum(flushedSerialNum), @@ -80,12 +81,6 @@ ShrinkLidSpaceFlushTarget::getLastFlushTime() const return _lastFlushTime; } -bool -ShrinkLidSpaceFlushTarget::needUrgentFlush() const -{ - return false; -} - IFlushTarget::Task::UP ShrinkLidSpaceFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) { diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h index e3efbd9b273..13f221e40a7 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h @@ -11,7 +11,7 @@ namespace proton { /** * Implements a flush target that shrinks lid space in target. */ -class ShrinkLidSpaceFlushTarget : public searchcorespi::IFlushTarget +class ShrinkLidSpaceFlushTarget : public searchcorespi::LeafFlushTarget { /** * Task representing that shrinking has been performed. @@ -46,7 +46,6 @@ public: DiskGain getApproxDiskGain() const override; SerialNum getFlushedSerialNum() const override; Time getLastFlushTime() const override; - bool needUrgentFlush() const override; Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; searchcorespi::FlushStats getLastFlushStats() const override; uint64_t getApproxBytesToWriteToDisk() const override; diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp index 5b49c724c6f..852ed73dc74 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp @@ -30,10 +30,4 @@ JobTrackedFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search return FlushTask::UP(); } -uint64_t -JobTrackedFlushTarget::getApproxBytesToWriteToDisk() const -{ - return _target->getApproxBytesToWriteToDisk(); -} - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h index 35d1b0b0b12..c09c1fac055 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h @@ -19,7 +19,7 @@ private: public: JobTrackedFlushTarget(std::shared_ptr<IJobTracker> tracker, std::shared_ptr<searchcorespi::IFlushTarget> target); - ~JobTrackedFlushTarget(); + ~JobTrackedFlushTarget() override; const IJobTracker &getTracker() const { return *_tracker; } const searchcorespi::IFlushTarget &getTarget() const { return *_target; } @@ -40,13 +40,18 @@ public: bool needUrgentFlush() const override { return _target->needUrgentFlush(); } + double get_replay_operation_cost() const override { + return _target->get_replay_operation_cost(); + } + Priority getPriority() const override { return _target->getPriority(); } searchcorespi::FlushTask::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; searchcorespi::FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } - uint64_t getApproxBytesToWriteToDisk() const override; + uint64_t getApproxBytesToWriteToDisk() const override { + return _target->getApproxBytesToWriteToDisk(); + } }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp index 8915e3b367c..29b31dd2add 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp @@ -5,10 +5,10 @@ namespace proton::test { DummyFlushTarget::DummyFlushTarget(const vespalib::string &name) noexcept - : searchcorespi::IFlushTarget(name) + : searchcorespi::LeafFlushTarget(name, Type::OTHER, Component::OTHER) {} DummyFlushTarget::DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept - : searchcorespi::IFlushTarget(name, type, component) + : searchcorespi::LeafFlushTarget(name, type, component) {} DummyFlushTarget::~DummyFlushTarget() = default; diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h index a9206233c9d..6b261f7bc4e 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h @@ -5,7 +5,7 @@ namespace proton::test { -struct DummyFlushTarget : public searchcorespi::IFlushTarget +struct DummyFlushTarget : public searchcorespi::LeafFlushTarget { DummyFlushTarget(const vespalib::string &name) noexcept; DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept; @@ -14,7 +14,6 @@ struct DummyFlushTarget : public searchcorespi::IFlushTarget DiskGain getApproxDiskGain() const override { return DiskGain(0, 0); } SerialNum getFlushedSerialNum() const override { return 0; } Time getLastFlushTime() const override { return Time(); } - bool needUrgentFlush() const override { return false; } searchcorespi::FlushTask::UP initFlush(SerialNum, std::shared_ptr<search::IFlushToken>) override { return searchcorespi::FlushTask::UP(); } diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp index d821e06a2a3..b31113e1abc 100644 --- a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp +++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp @@ -16,4 +16,8 @@ IFlushTarget::IFlushTarget(const vespalib::string &name, const Type &type, const IFlushTarget::~IFlushTarget() = default; +LeafFlushTarget::LeafFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept + : IFlushTarget(name, type, component) +{} + } diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h index e3ddf98ce9f..9e960757115 100644 --- a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h +++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h @@ -38,6 +38,11 @@ public: OTHER }; + enum class Priority { + NORMAL = 50, + HIGH = 100 + }; + private: vespalib::string _name; Type _type; @@ -133,7 +138,7 @@ public: /** * Return cost of replaying a feed operation relative to cost of reading a feed operation from tls. */ - virtual double get_replay_operation_cost() const { return 0.0; } + virtual double get_replay_operation_cost() const = 0; /** * Returns the last serial number for the transaction applied to @@ -156,7 +161,10 @@ public: * * @return true if an urgent flush is needed */ - virtual bool needUrgentFlush() const { return false; } + virtual bool needUrgentFlush() const = 0; + + /// Returns a priority for this target + virtual Priority getPriority() const = 0; /** * Initiates the flushing of temporary memory. This method must perform @@ -175,7 +183,14 @@ public: * @return The stats for the last flush. */ virtual FlushStats getLastFlushStats() const = 0; +}; +class LeafFlushTarget : public IFlushTarget { +public: + LeafFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept; + bool needUrgentFlush() const override { return false; } + Priority getPriority() const override { return Priority::NORMAL; } + double get_replay_operation_cost() const override { return 0.0; } }; } // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp index 53fb21bf1ed..b5a5e2c2843 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -10,7 +10,7 @@ LOG_SETUP(".searchcorespi.index.indexflushtarget"); namespace searchcorespi::index { IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer, IndexMaintainer::FlushStats flushStats) - : IFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX), + : LeafFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX), _indexMaintainer(indexMaintainer), _flushStats(flushStats), _numFrozenMemoryIndexes(indexMaintainer.getNumFrozenMemoryIndexes()), diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h index 2b9ecc9574b..9f524bc341d 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h @@ -9,7 +9,7 @@ namespace searchcorespi::index { /** * Flush target for flushing a memory index in an IndexMaintainer. **/ -class IndexFlushTarget : public IFlushTarget { +class IndexFlushTarget : public LeafFlushTarget { private: IndexMaintainer &_indexMaintainer; const IndexMaintainer::FlushStats _flushStats; @@ -29,6 +29,7 @@ public: Time getLastFlushTime() const override; bool needUrgentFlush() const override; + Priority getPriority() const override { return Priority::HIGH; } Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; FlushStats getLastFlushStats() const override { return _lastStats; } diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp index 6755976939b..562d49a4348 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -38,7 +38,7 @@ public: } IndexFusionTarget::IndexFusionTarget(IndexMaintainer &indexMaintainer) - : IFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX), + : LeafFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX), _indexMaintainer(indexMaintainer), _fusionStats(indexMaintainer.getFusionStats()), _lastStats() diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h index 7a9f44e6612..2be7bcc33a9 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h @@ -9,7 +9,7 @@ namespace searchcorespi::index { /** * Flush target for doing fusion on disk indexes in an IndexMaintainer. **/ -class IndexFusionTarget : public IFlushTarget { +class IndexFusionTarget : public LeafFlushTarget { private: IndexMaintainer &_indexMaintainer; IndexMaintainer::FusionStats _fusionStats; |