aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-05 15:53:03 +0200
committerGitHub <noreply@github.com>2023-05-05 15:53:03 +0200
commitf32aded80e93429a376a7c671a8c22ffa2c11ded (patch)
treee45545a94a34a6de1d8f81ffa43db2e0671809d1 /searchcore
parentf1ad2f1270209e107f931e111f6030dd5629d93d (diff)
Revert "Revert "Balder/refactor flushengine runloop""
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp11
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h18
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h20
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp16
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp85
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp51
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h16
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h3
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp4
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/iflushtarget.h19
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexflushtarget.h3
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h2
29 files changed, 158 insertions, 162 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index a7efe2952eb..3fdc5a8ce9f 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -450,6 +450,17 @@ struct Fixture
}
};
+TEST("require that leaf defaults are sane") {
+ test::DummyFlushTarget leaf("dummy");
+ EXPECT_FALSE(leaf.needUrgentFlush());
+ EXPECT_EQUAL(0.0, leaf.get_replay_operation_cost());
+ EXPECT_TRUE(IFlushTarget::Priority::NORMAL == leaf.getPriority());
+ EXPECT_TRUE(50 == static_cast<int>(IFlushTarget::Priority::NORMAL));
+ EXPECT_TRUE(100 == static_cast<int>(IFlushTarget::Priority::HIGH));
+ EXPECT_TRUE(IFlushTarget::Priority::NORMAL < IFlushTarget::Priority::HIGH);
+ EXPECT_TRUE(IFlushTarget::Priority::HIGH > IFlushTarget::Priority::NORMAL);
+}
+
TEST_F("require that strategy controls flush target", Fixture(1, IINTERVAL))
{
vespalib::Gate fooG, barG;
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 880bf8aa3e0..2f6ebcd967f 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -344,6 +344,10 @@ TEST_F(IndexManagerTest, require_that_large_memory_footprint_triggers_urgent_flu
EXPECT_TRUE(IndexFlushTarget(_index_manager->getMaintainer(), FlushStats(17_Gi)).needUrgentFlush());
}
+TEST_F(IndexManagerTest, require_that_flush_priority_is_high) {
+ EXPECT_EQ(IFlushTarget::Priority::HIGH, IndexFlushTarget(_index_manager->getMaintainer()).getPriority());
+}
+
TEST_F(IndexManagerTest, require_that_multiple_flushes_gives_multiple_indexes)
{
size_t flush_count = 10;
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
index 8101b29d98c..401cc42de52 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
@@ -155,7 +155,7 @@ FlushableAttribute::FlushableAttribute(AttributeVectorSP attr,
vespalib::ISequencedTaskExecutor &
attributeFieldWriter,
const HwInfo &hwInfo)
- : IFlushTarget(make_string("attribute.flush.%s", attr->getName().c_str()), Type::SYNC, Component::ATTRIBUTE),
+ : LeafFlushTarget(make_string("attribute.flush.%s", attr->getName().c_str()), Type::SYNC, Component::ATTRIBUTE),
_attr(attr),
_cleanUpAfterFlush(true),
_lastStats(),
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
index 2ae63bef4db..56dd0e0dfec 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
@@ -19,7 +19,7 @@ class TransientResourceUsage;
/**
* Implementation of IFlushTarget interface for attribute vectors.
*/
-class FlushableAttribute : public searchcorespi::IFlushTarget
+class FlushableAttribute : public searchcorespi::LeafFlushTarget
{
private:
/**
@@ -29,15 +29,15 @@ private:
using AttributeVectorSP = std::shared_ptr<search::AttributeVector>;
using FlushStats = searchcorespi::FlushStats;
- AttributeVectorSP _attr;
- bool _cleanUpAfterFlush;
- FlushStats _lastStats;
- const search::TuneFileAttributes _tuneFileAttributes;
+ AttributeVectorSP _attr;
+ bool _cleanUpAfterFlush;
+ FlushStats _lastStats;
+ const search::TuneFileAttributes _tuneFileAttributes;
const search::common::FileHeaderContext &_fileHeaderContext;
- vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
- HwInfo _hwInfo;
- std::shared_ptr<AttributeDirectory> _attrDir;
- double _replay_operation_cost;
+ vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
+ HwInfo _hwInfo;
+ std::shared_ptr<AttributeDirectory> _attrDir;
+ double _replay_operation_cost;
Task::UP internalInitFlush(SerialNum currentSerial);
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp
index 06bf8d0a8a6..57435c91b5f 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp
@@ -66,7 +66,7 @@ private:
}
SummaryGCTarget::SummaryGCTarget(const vespalib::string & name, vespalib::Executor & summaryService, IDocumentStore & docStore)
- : IFlushTarget(name, Type::GC, Component::DOCUMENT_STORE),
+ : LeafFlushTarget(name, Type::GC, Component::DOCUMENT_STORE),
_summaryService(summaryService),
_docStore(docStore),
_lastStats()
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h
index 083f763d8e6..56e17d76210 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h
@@ -12,7 +12,7 @@ namespace proton {
/**
* This class implements the IFlushTarget interface to proxy a summary manager.
*/
-class SummaryGCTarget : public searchcorespi::IFlushTarget {
+class SummaryGCTarget : public searchcorespi::LeafFlushTarget {
public:
using FlushStats = searchcorespi::FlushStats;
using IDocumentStore = search::IDocumentStore;
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
index 45fc23175bf..5cb60c907f1 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
@@ -43,7 +43,7 @@ public:
SummaryFlushTarget::SummaryFlushTarget(IDocumentStore & docStore,
vespalib::Executor & summaryService)
- : IFlushTarget("summary.flush", Type::SYNC, Component::DOCUMENT_STORE),
+ : LeafFlushTarget("summary.flush", Type::SYNC, Component::DOCUMENT_STORE),
_docStore(docStore),
_summaryService(summaryService),
_lastStats()
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
index f864b922af8..1ac7b6c9c0e 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
@@ -9,7 +9,7 @@ namespace proton {
/**
* This class implements the IFlushTarget interface to proxy a summary manager.
*/
-class SummaryFlushTarget : public searchcorespi::IFlushTarget {
+class SummaryFlushTarget : public searchcorespi::LeafFlushTarget {
private:
using FlushStats = searchcorespi::FlushStats;
search::IDocumentStore & _docStore;
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
index 543af05db45..d1e39628c83 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
@@ -42,7 +42,7 @@ namespace proton {
namespace {
-class ShrinkSummaryLidSpaceFlushTarget : public ShrinkLidSpaceFlushTarget
+class ShrinkSummaryLidSpaceFlushTarget : public ShrinkLidSpaceFlushTarget
{
using ICompactableLidSpace = search::common::ICompactableLidSpace;
vespalib::Executor & _summaryService;
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
index a712035e9af..db3d8adfe26 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
@@ -150,7 +150,7 @@ DocumentMetaStoreFlushTarget::
DocumentMetaStoreFlushTarget(const DocumentMetaStore::SP dms, ITlsSyncer &tlsSyncer,
const vespalib::string & baseDir, const TuneFileAttributes &tuneFileAttributes,
const FileHeaderContext &fileHeaderContext, const HwInfo &hwInfo)
- : IFlushTarget("documentmetastore.flush", Type::SYNC, Component::ATTRIBUTE),
+ : LeafFlushTarget("documentmetastore.flush", Type::SYNC, Component::ATTRIBUTE),
_dms(dms),
_tlsSyncer(tlsSyncer),
_baseDir(baseDir),
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
index 777d7de1e17..5a3a671553c 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
@@ -19,7 +19,7 @@ class TransientResourceUsage;
/**
* Implementation of IFlushTarget interface for document meta store.
**/
-class DocumentMetaStoreFlushTarget : public searchcorespi::IFlushTarget
+class DocumentMetaStoreFlushTarget : public searchcorespi::LeafFlushTarget
{
private:
/**
@@ -29,16 +29,16 @@ private:
using DocumentMetaStoreSP = std::shared_ptr<DocumentMetaStore>;
using FlushStats = searchcorespi::FlushStats;
- DocumentMetaStoreSP _dms;
- ITlsSyncer &_tlsSyncer;
- vespalib::string _baseDir;
- bool _cleanUpAfterFlush;
- FlushStats _lastStats;
- const search::TuneFileAttributes _tuneFileAttributes;
+ DocumentMetaStoreSP _dms;
+ ITlsSyncer &_tlsSyncer;
+ vespalib::string _baseDir;
+ bool _cleanUpAfterFlush;
+ FlushStats _lastStats;
+ const search::TuneFileAttributes _tuneFileAttributes;
const search::common::FileHeaderContext &_fileHeaderContext;
- HwInfo _hwInfo;
- std::shared_ptr<AttributeDiskLayout> _diskLayout;
- std::shared_ptr<AttributeDirectory> _dmsDir;
+ HwInfo _hwInfo;
+ std::shared_ptr<AttributeDiskLayout> _diskLayout;
+ std::shared_ptr<AttributeDirectory> _dmsDir;
public:
using SP = std::shared_ptr<DocumentMetaStoreFlushTarget>;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp
index 3947ac1d07d..9f212006234 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp
@@ -11,18 +11,10 @@ CachedFlushTarget::CachedFlushTarget(const IFlushTarget::SP &target)
_lastFlushTime(target->getLastFlushTime()),
_memoryGain(target->getApproxMemoryGain()),
_diskGain(target->getApproxDiskGain()),
+ _approxBytesToWriteToDisk(target->getApproxBytesToWriteToDisk()),
+ _replay_operation_cost(target->get_replay_operation_cost()),
_needUrgentFlush(target->needUrgentFlush()),
- _approxBytesToWriteToDisk(target->getApproxBytesToWriteToDisk())
-{
- // empty
-}
-
-
-uint64_t
-CachedFlushTarget::getApproxBytesToWriteToDisk() const
-{
- return _approxBytesToWriteToDisk;
-}
-
+ _priority(target->getPriority())
+{ }
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h
index 1bd51e9ab11..a43c7c16217 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h
@@ -19,8 +19,10 @@ private:
Time _lastFlushTime;
MemoryGain _memoryGain;
DiskGain _diskGain;
- bool _needUrgentFlush;
uint64_t _approxBytesToWriteToDisk;
+ double _replay_operation_cost;
+ bool _needUrgentFlush;
+ Priority _priority;
public:
/**
@@ -47,13 +49,15 @@ public:
SerialNum getFlushedSerialNum() const override { return _flushedSerialNum; }
Time getLastFlushTime() const override { return _lastFlushTime; }
bool needUrgentFlush() const override { return _needUrgentFlush; }
+ Priority getPriority() const override { return _priority; }
+ double get_replay_operation_cost() const override { return _replay_operation_cost; }
Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override {
return _target->initFlush(currentSerial, std::move(flush_token));
}
FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); }
- uint64_t getApproxBytesToWriteToDisk() const override;
+ uint64_t getApproxBytesToWriteToDisk() const override { return _approxBytesToWriteToDisk; }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 3e577bf6cbe..772216e7f37 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -128,8 +128,7 @@ FlushEngine::close()
if (_thread.joinable()) {
_thread.join();
}
- _executor.shutdown();
- _executor.sync();
+ _executor.shutdown().sync();
return *this;
}
@@ -158,36 +157,61 @@ bool
FlushEngine::wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune)
{
std::unique_lock<std::mutex> guard(_lock);
- if ( (minimumWaitTimeIfReady != vespalib::duration::zero()) && canFlushMore(guard) && _pendingPrune.empty()) {
+ if (canFlushMore(guard) && _pendingPrune.empty()) {
_cond.wait_for(guard, minimumWaitTimeIfReady);
}
while ( ! canFlushMore(guard) && ( ignorePendingPrune || _pendingPrune.empty())) {
_cond.wait_for(guard, 1s); // broadcast when flush done
}
- return !_closed;
+ return !_closed.load(std::memory_order_relaxed);
+}
+
+void
+FlushEngine::wait_for_slot(IFlushTarget::Priority priority)
+{
+ (void) priority;
+ std::unique_lock<std::mutex> guard(_lock);
+ while ( ! canFlushMore(guard) && _pendingPrune.empty()) {
+ _cond.wait_for(guard, 1s); // broadcast when flush done
+ }
+}
+
+std::pair<bool, vespalib::string>
+FlushEngine::checkAndFlush(vespalib::string prev) {
+ std::pair<FlushContext::List, bool> lst = getSortedTargetList();
+ if (lst.second) {
+ // Everything returned from a priority strategy should be flushed
+ flushAll(lst.first);
+ } else if ( ! lst.first.empty()) {
+ wait_for_slot(lst.first[0]->getTarget()->getPriority());
+ prev = flushNextTarget(prev, lst.first);
+ if (!prev.empty()) {
+ // Sleep 1 ms after a successful flush in order to avoid busy loop in case
+ // of strategy or target error.
+ std::this_thread::sleep_for(1ms);
+ return {false, prev};
+ }
+ }
+ return {true, ""};
}
void
FlushEngine::run()
{
_has_thread = true;
- bool shouldIdle = false;
vespalib::string prevFlushName;
- while (wait(shouldIdle ? _idleInterval : vespalib::duration::zero(), false)) {
- shouldIdle = false;
+ for (vespalib::duration idleInterval=vespalib::duration::zero(); !_closed.load(std::memory_order_relaxed); idleInterval = _idleInterval) {
+ LOG(debug, "Making another check for something to flush, last was '%s'", prevFlushName.c_str());
if (prune()) {
- continue; // Prune attempted on one or more handlers
- }
- prevFlushName = flushNextTarget(prevFlushName);
- if ( ! prevFlushName.empty()) {
- // Sleep 1 ms after a successful flush in order to avoid busy loop in case
- // of strategy or target error.
- std::this_thread::sleep_for(1ms);
+ // Prune attempted on one or more handlers
+ wait_for_slot(IFlushTarget::Priority::NORMAL);
} else {
- shouldIdle = true;
+ auto [needWait, name] = checkAndFlush(prevFlushName);
+ prevFlushName = name;
+ if (needWait) {
+ wait(idleInterval);
+ }
}
- LOG(debug, "Making another wait(idle=%s, timeS=%1.3f) last was '%s'",
- shouldIdle ? "true" : "false", shouldIdle ? vespalib::to_s(_idleInterval) : 0, prevFlushName.c_str());
}
_executor.sync();
prune();
@@ -337,27 +361,21 @@ FlushEngine::flushAll(const FlushContext::List &lst)
}
}
}
+ _executor.sync();
+ prune();
+ std::lock_guard<std::mutex> strategyGuard(_strategyLock);
+ _priorityStrategy.reset();
+ _strategyCond.notify_all();
}
vespalib::string
-FlushEngine::flushNextTarget(const vespalib::string & name)
+FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts)
{
- std::pair<FlushContext::List,bool> lst = getSortedTargetList();
- if (lst.second) {
- // Everything returned from a priority strategy should be flushed
- flushAll(lst.first);
- _executor.sync();
- prune();
- std::lock_guard<std::mutex> strategyGuard(_strategyLock);
- _priorityStrategy.reset();
- _strategyCond.notify_all();
- return "";
- }
- if (lst.first.empty()) {
+ if (contexts.empty()) {
LOG(debug, "No target to flush.");
return "";
}
- FlushContext::SP ctx = initNextFlush(lst.first);
+ FlushContext::SP ctx = initNextFlush(contexts);
if ( ! ctx) {
LOG(debug, "All targets refused to flush.");
return "";
@@ -365,7 +383,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name)
if ( name == ctx->getName()) {
LOG(info, "The same target %s out of %ld has been asked to flush again. "
"This might indicate flush logic flaw so I will wait 100 ms before doing it.",
- name.c_str(), lst.first.size());
+ name.c_str(), contexts.size());
std::this_thread::sleep_for(100ms);
}
_executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
@@ -445,7 +463,6 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP
{
std::lock_guard<std::mutex> guard(_lock);
taskId = _taskId++;
- vespalib::string name(FlushContext::createName(*handler, *target));
FlushInfo flush(taskId, handler->getName(), target);
_flushing[taskId] = flush;
}
@@ -459,7 +476,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy)
{
std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock);
std::unique_lock<std::mutex> strategyGuard(_strategyLock);
- if (_closed) {
+ if (_closed.load(std::memory_order_relaxed)) {
return;
}
assert(!_priorityStrategy);
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 1d6ed763ff6..06ce34bb831 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -50,7 +50,7 @@ private:
};
using FlushMap = std::map<uint32_t, FlushInfo>;
using FlushHandlerMap = HandlerMap<IFlushHandler>;
- bool _closed;
+ std::atomic<bool> _closed;
const uint32_t _maxConcurrent;
const vespalib::duration _idleInterval;
uint32_t _taskId;
@@ -75,15 +75,20 @@ private:
std::pair<FlushContext::List,bool> getSortedTargetList();
std::shared_ptr<search::IFlushToken> get_flush_token(const FlushContext& ctx);
FlushContext::SP initNextFlush(const FlushContext::List &lst);
- vespalib::string flushNextTarget(const vespalib::string & name);
+ vespalib::string flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts);
void flushAll(const FlushContext::List &lst);
bool prune();
uint32_t initFlush(const FlushContext &ctx);
uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target);
void flushDone(const FlushContext &ctx, uint32_t taskId);
bool canFlushMore(const std::unique_lock<std::mutex> &guard) const;
+ void wait_for_slot(IFlushTarget::Priority priority);
bool wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune);
+ void wait(vespalib::duration minimumWaitTimeIfReady) {
+ wait(minimumWaitTimeIfReady, false);
+ }
bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const;
+ std::pair<bool, vespalib::string> checkAndFlush(vespalib::string prev);
friend class FlushTask;
friend class FlushEngineExplorer;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp
index eb35e2b2eb1..b62a9191625 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp
@@ -22,61 +22,10 @@ FlushTargetProxy::FlushTargetProxy(const IFlushTarget::SP &target,
{
}
-
-IFlushTarget::MemoryGain
-FlushTargetProxy::getApproxMemoryGain() const
-{
- return _target->getApproxMemoryGain();
-}
-
-
-IFlushTarget::DiskGain
-FlushTargetProxy::getApproxDiskGain() const
-{
- return _target->getApproxDiskGain();
-}
-
-
-IFlushTarget::SerialNum
-FlushTargetProxy::getFlushedSerialNum() const
-{
- return _target->getFlushedSerialNum();
-}
-
-
-IFlushTarget::Time
-FlushTargetProxy::getLastFlushTime() const
-{
- return _target->getLastFlushTime();
-}
-
-
-bool
-FlushTargetProxy::needUrgentFlush() const
-{
- return _target->needUrgentFlush();
-}
-
-
IFlushTarget::Task::UP
FlushTargetProxy::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token)
{
return _target->initFlush(currentSerial, std::move(flush_token));
}
-
-FlushStats
-FlushTargetProxy::getLastFlushStats() const
-{
- return _target->getLastFlushStats();
-}
-
-
-uint64_t
-FlushTargetProxy::getApproxBytesToWriteToDisk() const
-{
- return _target->getApproxBytesToWriteToDisk();
-}
-
-
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h
index 0967ca1d6c1..e2ac832e023 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h
@@ -41,14 +41,16 @@ public:
*/
const IFlushTarget::SP & getFlushTarget() const { return _target; }
// Implements IFlushTarget.
- MemoryGain getApproxMemoryGain() const override;
- DiskGain getApproxDiskGain() const override;
- SerialNum getFlushedSerialNum() const override;
- Time getLastFlushTime() const override;
- bool needUrgentFlush() const override;
+ MemoryGain getApproxMemoryGain() const override { return _target->getApproxMemoryGain(); }
+ DiskGain getApproxDiskGain() const override { return _target->getApproxDiskGain(); }
+ SerialNum getFlushedSerialNum() const override { return _target->getFlushedSerialNum(); }
+ Time getLastFlushTime() const override { return _target->getLastFlushTime(); }
+ bool needUrgentFlush() const override { return _target->needUrgentFlush(); }
+ Priority getPriority() const override { return _target->getPriority(); }
+ uint64_t getApproxBytesToWriteToDisk() const override { return _target->getApproxBytesToWriteToDisk(); }
+ searchcorespi::FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); }
+ double get_replay_operation_cost() const override { return _target->get_replay_operation_cost(); }
Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
- searchcorespi::FlushStats getLastFlushStats() const override;
- uint64_t getApproxBytesToWriteToDisk() const override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp
index b824f2a7c88..d292f8347a7 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp
@@ -6,6 +6,7 @@
namespace proton {
using searchcorespi::IFlushTarget;
+using searchcorespi::LeafFlushTarget;
using searchcorespi::FlushStats;
using searchcorespi::FlushTask;
@@ -46,7 +47,7 @@ ShrinkLidSpaceFlushTarget::ShrinkLidSpaceFlushTarget(const vespalib::string &nam
SerialNum flushedSerialNum,
Time lastFlushTime,
std::shared_ptr<ICompactableLidSpace> target)
- : IFlushTarget(name, type, component),
+ : LeafFlushTarget(name, type, component),
_target(std::move(target)),
_flushedSerialNum(flushedSerialNum),
@@ -80,12 +81,6 @@ ShrinkLidSpaceFlushTarget::getLastFlushTime() const
return _lastFlushTime;
}
-bool
-ShrinkLidSpaceFlushTarget::needUrgentFlush() const
-{
- return false;
-}
-
IFlushTarget::Task::UP
ShrinkLidSpaceFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>)
{
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h
index e3efbd9b273..13f221e40a7 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h
@@ -11,7 +11,7 @@ namespace proton {
/**
* Implements a flush target that shrinks lid space in target.
*/
-class ShrinkLidSpaceFlushTarget : public searchcorespi::IFlushTarget
+class ShrinkLidSpaceFlushTarget : public searchcorespi::LeafFlushTarget
{
/**
* Task representing that shrinking has been performed.
@@ -46,7 +46,6 @@ public:
DiskGain getApproxDiskGain() const override;
SerialNum getFlushedSerialNum() const override;
Time getLastFlushTime() const override;
- bool needUrgentFlush() const override;
Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
searchcorespi::FlushStats getLastFlushStats() const override;
uint64_t getApproxBytesToWriteToDisk() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
index 5b49c724c6f..852ed73dc74 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
@@ -30,10 +30,4 @@ JobTrackedFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search
return FlushTask::UP();
}
-uint64_t
-JobTrackedFlushTarget::getApproxBytesToWriteToDisk() const
-{
- return _target->getApproxBytesToWriteToDisk();
-}
-
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
index 35d1b0b0b12..c09c1fac055 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
@@ -19,7 +19,7 @@ private:
public:
JobTrackedFlushTarget(std::shared_ptr<IJobTracker> tracker,
std::shared_ptr<searchcorespi::IFlushTarget> target);
- ~JobTrackedFlushTarget();
+ ~JobTrackedFlushTarget() override;
const IJobTracker &getTracker() const { return *_tracker; }
const searchcorespi::IFlushTarget &getTarget() const { return *_target; }
@@ -40,13 +40,18 @@ public:
bool needUrgentFlush() const override {
return _target->needUrgentFlush();
}
+ double get_replay_operation_cost() const override {
+ return _target->get_replay_operation_cost();
+ }
+ Priority getPriority() const override { return _target->getPriority(); }
searchcorespi::FlushTask::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
searchcorespi::FlushStats getLastFlushStats() const override {
return _target->getLastFlushStats();
}
- uint64_t getApproxBytesToWriteToDisk() const override;
+ uint64_t getApproxBytesToWriteToDisk() const override {
+ return _target->getApproxBytesToWriteToDisk();
+ }
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp
index 8915e3b367c..29b31dd2add 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp
@@ -5,10 +5,10 @@
namespace proton::test {
DummyFlushTarget::DummyFlushTarget(const vespalib::string &name) noexcept
- : searchcorespi::IFlushTarget(name)
+ : searchcorespi::LeafFlushTarget(name, Type::OTHER, Component::OTHER)
{}
DummyFlushTarget::DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept
- : searchcorespi::IFlushTarget(name, type, component)
+ : searchcorespi::LeafFlushTarget(name, type, component)
{}
DummyFlushTarget::~DummyFlushTarget() = default;
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
index a9206233c9d..6b261f7bc4e 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
@@ -5,7 +5,7 @@
namespace proton::test {
-struct DummyFlushTarget : public searchcorespi::IFlushTarget
+struct DummyFlushTarget : public searchcorespi::LeafFlushTarget
{
DummyFlushTarget(const vespalib::string &name) noexcept;
DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept;
@@ -14,7 +14,6 @@ struct DummyFlushTarget : public searchcorespi::IFlushTarget
DiskGain getApproxDiskGain() const override { return DiskGain(0, 0); }
SerialNum getFlushedSerialNum() const override { return 0; }
Time getLastFlushTime() const override { return Time(); }
- bool needUrgentFlush() const override { return false; }
searchcorespi::FlushTask::UP initFlush(SerialNum, std::shared_ptr<search::IFlushToken>) override {
return searchcorespi::FlushTask::UP();
}
diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp
index d821e06a2a3..b31113e1abc 100644
--- a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp
+++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp
@@ -16,4 +16,8 @@ IFlushTarget::IFlushTarget(const vespalib::string &name, const Type &type, const
IFlushTarget::~IFlushTarget() = default;
+LeafFlushTarget::LeafFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept
+ : IFlushTarget(name, type, component)
+{}
+
}
diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h
index e3ddf98ce9f..9e960757115 100644
--- a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h
+++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h
@@ -38,6 +38,11 @@ public:
OTHER
};
+ enum class Priority {
+ NORMAL = 50,
+ HIGH = 100
+ };
+
private:
vespalib::string _name;
Type _type;
@@ -133,7 +138,7 @@ public:
/**
* Return cost of replaying a feed operation relative to cost of reading a feed operation from tls.
*/
- virtual double get_replay_operation_cost() const { return 0.0; }
+ virtual double get_replay_operation_cost() const = 0;
/**
* Returns the last serial number for the transaction applied to
@@ -156,7 +161,10 @@ public:
*
* @return true if an urgent flush is needed
*/
- virtual bool needUrgentFlush() const { return false; }
+ virtual bool needUrgentFlush() const = 0;
+
+ /// Returns a priority for this target
+ virtual Priority getPriority() const = 0;
/**
* Initiates the flushing of temporary memory. This method must perform
@@ -175,7 +183,14 @@ public:
* @return The stats for the last flush.
*/
virtual FlushStats getLastFlushStats() const = 0;
+};
+class LeafFlushTarget : public IFlushTarget {
+public:
+ LeafFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept;
+ bool needUrgentFlush() const override { return false; }
+ Priority getPriority() const override { return Priority::NORMAL; }
+ double get_replay_operation_cost() const override { return 0.0; }
};
} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
index 53fb21bf1ed..b5a5e2c2843 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
@@ -10,7 +10,7 @@ LOG_SETUP(".searchcorespi.index.indexflushtarget");
namespace searchcorespi::index {
IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer, IndexMaintainer::FlushStats flushStats)
- : IFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX),
+ : LeafFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX),
_indexMaintainer(indexMaintainer),
_flushStats(flushStats),
_numFrozenMemoryIndexes(indexMaintainer.getNumFrozenMemoryIndexes()),
diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h
index 2b9ecc9574b..9f524bc341d 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h
+++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h
@@ -9,7 +9,7 @@ namespace searchcorespi::index {
/**
* Flush target for flushing a memory index in an IndexMaintainer.
**/
-class IndexFlushTarget : public IFlushTarget {
+class IndexFlushTarget : public LeafFlushTarget {
private:
IndexMaintainer &_indexMaintainer;
const IndexMaintainer::FlushStats _flushStats;
@@ -29,6 +29,7 @@ public:
Time getLastFlushTime() const override;
bool needUrgentFlush() const override;
+ Priority getPriority() const override { return Priority::HIGH; }
Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
FlushStats getLastFlushStats() const override { return _lastStats; }
diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
index 6755976939b..562d49a4348 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
@@ -38,7 +38,7 @@ public:
}
IndexFusionTarget::IndexFusionTarget(IndexMaintainer &indexMaintainer)
- : IFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX),
+ : LeafFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX),
_indexMaintainer(indexMaintainer),
_fusionStats(indexMaintainer.getFusionStats()),
_lastStats()
diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h
index 7a9f44e6612..2be7bcc33a9 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h
+++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h
@@ -9,7 +9,7 @@ namespace searchcorespi::index {
/**
* Flush target for doing fusion on disk indexes in an IndexMaintainer.
**/
-class IndexFusionTarget : public IFlushTarget {
+class IndexFusionTarget : public LeafFlushTarget {
private:
IndexMaintainer &_indexMaintainer;
IndexMaintainer::FusionStats _fusionStats;