diff options
33 files changed, 173 insertions, 166 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/search/NodeResourcesTuning.java b/config-model/src/main/java/com/yahoo/vespa/model/search/NodeResourcesTuning.java index 2de06e2053a..fe002be0677 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/search/NodeResourcesTuning.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/search/NodeResourcesTuning.java @@ -17,7 +17,7 @@ public class NodeResourcesTuning implements ProtonConfig.Producer { private final static double SUMMARY_FILE_SIZE_AS_FRACTION_OF_MEMORY = 0.02; private final static double SUMMARY_CACHE_SIZE_AS_FRACTION_OF_MEMORY = 0.04; private final static double MEMORY_GAIN_AS_FRACTION_OF_MEMORY = 0.08; - private final static double MIN_MEMORY_PER_FLUSH_THREAD_GB = 16.0; + private final static double MIN_MEMORY_PER_FLUSH_THREAD_GB = 11.0; private final static double TLS_SIZE_FRACTION = 0.02; final static long MB = 1024 * 1024; public final static long GB = MB * 1024; diff --git a/config-model/src/test/java/com/yahoo/vespa/model/search/NodeResourcesTuningTest.java b/config-model/src/test/java/com/yahoo/vespa/model/search/NodeResourcesTuningTest.java index d344be3da9a..e81f7f85f49 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/search/NodeResourcesTuningTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/search/NodeResourcesTuningTest.java @@ -183,9 +183,9 @@ public class NodeResourcesTuningTest { @Test public void require_that_concurrent_flush_threads_is_1_with_low_memory() { assertEquals(1, fromMemAndCpu(1, 8).flush().maxconcurrent()); - assertEquals(1, fromMemAndCpu(15, 8).flush().maxconcurrent()); - assertEquals(1, fromMemAndCpu(16, 8).flush().maxconcurrent()); - assertEquals(2, fromMemAndCpu(17, 8).flush().maxconcurrent()); + assertEquals(1, fromMemAndCpu(10, 8).flush().maxconcurrent()); + assertEquals(1, fromMemAndCpu(11, 8).flush().maxconcurrent()); + assertEquals(2, fromMemAndCpu(12, 8).flush().maxconcurrent()); assertEquals(2, fromMemAndCpu(65, 8).flush().maxconcurrent()); // still capped by max assertEquals(2, fromMemAndCpu(65, 65).flush().maxconcurrent()); // still capped by max } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java index 3af63125474..484c5abdb89 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.hosted.provision.provisioning; import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.CloudName; import com.yahoo.config.provision.ClusterMembership; import com.yahoo.config.provision.ClusterSpec; import com.yahoo.config.provision.Flavor; @@ -120,7 +121,7 @@ class NodeAllocation { if ( candidate.state() == Node.State.active && allocation.removable()) continue; // don't accept; causes removal if ( candidate.state() == Node.State.active && candidate.wantToFail()) continue; // don't accept; causes failing if ( indexes.contains(membership.index())) continue; // duplicate index (just to be sure) - if ( candidate.parent.isPresent() && ! candidate.parent.get().cloudAccount().equals(requestedNodes.cloudAccount())) continue; // wrong account + if (nodeRepository.zone().cloud().name().equals(CloudName.AWS) && candidate.parent.isPresent() && ! candidate.parent.get().cloudAccount().equals(requestedNodes.cloudAccount())) continue; // wrong account boolean resizeable = requestedNodes.considerRetiring() && candidate.isResizable; boolean acceptToRetire = acceptToRetire(candidate); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainerTest.java index 0a1bdb7b400..3992401e29f 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainerTest.java @@ -1,19 +1,20 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.maintenance; -import com.yahoo.config.provision.ClusterInfo; -import com.yahoo.config.provision.IntRange; import com.yahoo.component.Version; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Capacity; import com.yahoo.config.provision.Cloud; import com.yahoo.config.provision.CloudAccount; +import com.yahoo.config.provision.CloudName; +import com.yahoo.config.provision.ClusterInfo; import com.yahoo.config.provision.ClusterMembership; import com.yahoo.config.provision.ClusterResources; import com.yahoo.config.provision.ClusterSpec; import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.Flavor; import com.yahoo.config.provision.HostSpec; +import com.yahoo.config.provision.IntRange; import com.yahoo.config.provision.NodeFlavors; import com.yahoo.config.provision.NodeResources; import com.yahoo.config.provision.NodeType; @@ -46,6 +47,7 @@ import com.yahoo.vespa.service.duper.ControllerHostApplication; import com.yahoo.vespa.service.duper.InfraApplication; import com.yahoo.vespa.service.duper.TenantHostApplication; import org.junit.Test; + import java.time.Duration; import java.time.Instant; import java.util.Collections; @@ -458,7 +460,7 @@ public class HostCapacityMaintainerTest { @Test public void custom_cloud_account() { - DynamicProvisioningTester tester = new DynamicProvisioningTester(Cloud.builder().dynamicProvisioning(true).build(), + DynamicProvisioningTester tester = new DynamicProvisioningTester(Cloud.builder().name(CloudName.AWS).dynamicProvisioning(true).account(CloudAccount.from("001122334455")).build(), new MockNameResolver().mockAnyLookup()); ProvisioningTester provisioningTester = tester.provisioningTester; ApplicationId applicationId = ApplicationId.from("t1", "a1", "i1"); diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index 3fdc5a8ce9f..a7efe2952eb 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -450,17 +450,6 @@ struct Fixture } }; -TEST("require that leaf defaults are sane") { - test::DummyFlushTarget leaf("dummy"); - EXPECT_FALSE(leaf.needUrgentFlush()); - EXPECT_EQUAL(0.0, leaf.get_replay_operation_cost()); - EXPECT_TRUE(IFlushTarget::Priority::NORMAL == leaf.getPriority()); - EXPECT_TRUE(50 == static_cast<int>(IFlushTarget::Priority::NORMAL)); - EXPECT_TRUE(100 == static_cast<int>(IFlushTarget::Priority::HIGH)); - EXPECT_TRUE(IFlushTarget::Priority::NORMAL < IFlushTarget::Priority::HIGH); - EXPECT_TRUE(IFlushTarget::Priority::HIGH > IFlushTarget::Priority::NORMAL); -} - TEST_F("require that strategy controls flush target", Fixture(1, IINTERVAL)) { vespalib::Gate fooG, barG; diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 2f6ebcd967f..880bf8aa3e0 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -344,10 +344,6 @@ TEST_F(IndexManagerTest, require_that_large_memory_footprint_triggers_urgent_flu EXPECT_TRUE(IndexFlushTarget(_index_manager->getMaintainer(), FlushStats(17_Gi)).needUrgentFlush()); } -TEST_F(IndexManagerTest, require_that_flush_priority_is_high) { - EXPECT_EQ(IFlushTarget::Priority::HIGH, IndexFlushTarget(_index_manager->getMaintainer()).getPriority()); -} - TEST_F(IndexManagerTest, require_that_multiple_flushes_gives_multiple_indexes) { size_t flush_count = 10; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp index 401cc42de52..8101b29d98c 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp @@ -155,7 +155,7 @@ FlushableAttribute::FlushableAttribute(AttributeVectorSP attr, vespalib::ISequencedTaskExecutor & attributeFieldWriter, const HwInfo &hwInfo) - : LeafFlushTarget(make_string("attribute.flush.%s", attr->getName().c_str()), Type::SYNC, Component::ATTRIBUTE), + : IFlushTarget(make_string("attribute.flush.%s", attr->getName().c_str()), Type::SYNC, Component::ATTRIBUTE), _attr(attr), _cleanUpAfterFlush(true), _lastStats(), diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h index 56dd0e0dfec..2ae63bef4db 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h @@ -19,7 +19,7 @@ class TransientResourceUsage; /** * Implementation of IFlushTarget interface for attribute vectors. */ -class FlushableAttribute : public searchcorespi::LeafFlushTarget +class FlushableAttribute : public searchcorespi::IFlushTarget { private: /** @@ -29,15 +29,15 @@ private: using AttributeVectorSP = std::shared_ptr<search::AttributeVector>; using FlushStats = searchcorespi::FlushStats; - AttributeVectorSP _attr; - bool _cleanUpAfterFlush; - FlushStats _lastStats; - const search::TuneFileAttributes _tuneFileAttributes; + AttributeVectorSP _attr; + bool _cleanUpAfterFlush; + FlushStats _lastStats; + const search::TuneFileAttributes _tuneFileAttributes; const search::common::FileHeaderContext &_fileHeaderContext; - vespalib::ISequencedTaskExecutor &_attributeFieldWriter; - HwInfo _hwInfo; - std::shared_ptr<AttributeDirectory> _attrDir; - double _replay_operation_cost; + vespalib::ISequencedTaskExecutor &_attributeFieldWriter; + HwInfo _hwInfo; + std::shared_ptr<AttributeDirectory> _attrDir; + double _replay_operation_cost; Task::UP internalInitFlush(SerialNum currentSerial); diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp index 57435c91b5f..06bf8d0a8a6 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp @@ -66,7 +66,7 @@ private: } SummaryGCTarget::SummaryGCTarget(const vespalib::string & name, vespalib::Executor & summaryService, IDocumentStore & docStore) - : LeafFlushTarget(name, Type::GC, Component::DOCUMENT_STORE), + : IFlushTarget(name, Type::GC, Component::DOCUMENT_STORE), _summaryService(summaryService), _docStore(docStore), _lastStats() diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h index 56e17d76210..083f763d8e6 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h @@ -12,7 +12,7 @@ namespace proton { /** * This class implements the IFlushTarget interface to proxy a summary manager. */ -class SummaryGCTarget : public searchcorespi::LeafFlushTarget { +class SummaryGCTarget : public searchcorespi::IFlushTarget { public: using FlushStats = searchcorespi::FlushStats; using IDocumentStore = search::IDocumentStore; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp index 5cb60c907f1..45fc23175bf 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp @@ -43,7 +43,7 @@ public: SummaryFlushTarget::SummaryFlushTarget(IDocumentStore & docStore, vespalib::Executor & summaryService) - : LeafFlushTarget("summary.flush", Type::SYNC, Component::DOCUMENT_STORE), + : IFlushTarget("summary.flush", Type::SYNC, Component::DOCUMENT_STORE), _docStore(docStore), _summaryService(summaryService), _lastStats() diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h index 1ac7b6c9c0e..f864b922af8 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h @@ -9,7 +9,7 @@ namespace proton { /** * This class implements the IFlushTarget interface to proxy a summary manager. */ -class SummaryFlushTarget : public searchcorespi::LeafFlushTarget { +class SummaryFlushTarget : public searchcorespi::IFlushTarget { private: using FlushStats = searchcorespi::FlushStats; search::IDocumentStore & _docStore; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp index d1e39628c83..543af05db45 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp @@ -42,7 +42,7 @@ namespace proton { namespace { -class ShrinkSummaryLidSpaceFlushTarget : public ShrinkLidSpaceFlushTarget +class ShrinkSummaryLidSpaceFlushTarget : public ShrinkLidSpaceFlushTarget { using ICompactableLidSpace = search::common::ICompactableLidSpace; vespalib::Executor & _summaryService; diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp index db3d8adfe26..a712035e9af 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp @@ -150,7 +150,7 @@ DocumentMetaStoreFlushTarget:: DocumentMetaStoreFlushTarget(const DocumentMetaStore::SP dms, ITlsSyncer &tlsSyncer, const vespalib::string & baseDir, const TuneFileAttributes &tuneFileAttributes, const FileHeaderContext &fileHeaderContext, const HwInfo &hwInfo) - : LeafFlushTarget("documentmetastore.flush", Type::SYNC, Component::ATTRIBUTE), + : IFlushTarget("documentmetastore.flush", Type::SYNC, Component::ATTRIBUTE), _dms(dms), _tlsSyncer(tlsSyncer), _baseDir(baseDir), diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h index 5a3a671553c..777d7de1e17 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h @@ -19,7 +19,7 @@ class TransientResourceUsage; /** * Implementation of IFlushTarget interface for document meta store. **/ -class DocumentMetaStoreFlushTarget : public searchcorespi::LeafFlushTarget +class DocumentMetaStoreFlushTarget : public searchcorespi::IFlushTarget { private: /** @@ -29,16 +29,16 @@ private: using DocumentMetaStoreSP = std::shared_ptr<DocumentMetaStore>; using FlushStats = searchcorespi::FlushStats; - DocumentMetaStoreSP _dms; - ITlsSyncer &_tlsSyncer; - vespalib::string _baseDir; - bool _cleanUpAfterFlush; - FlushStats _lastStats; - const search::TuneFileAttributes _tuneFileAttributes; + DocumentMetaStoreSP _dms; + ITlsSyncer &_tlsSyncer; + vespalib::string _baseDir; + bool _cleanUpAfterFlush; + FlushStats _lastStats; + const search::TuneFileAttributes _tuneFileAttributes; const search::common::FileHeaderContext &_fileHeaderContext; - HwInfo _hwInfo; - std::shared_ptr<AttributeDiskLayout> _diskLayout; - std::shared_ptr<AttributeDirectory> _dmsDir; + HwInfo _hwInfo; + std::shared_ptr<AttributeDiskLayout> _diskLayout; + std::shared_ptr<AttributeDirectory> _dmsDir; public: using SP = std::shared_ptr<DocumentMetaStoreFlushTarget>; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp index 9f212006234..3947ac1d07d 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.cpp @@ -11,10 +11,18 @@ CachedFlushTarget::CachedFlushTarget(const IFlushTarget::SP &target) _lastFlushTime(target->getLastFlushTime()), _memoryGain(target->getApproxMemoryGain()), _diskGain(target->getApproxDiskGain()), - _approxBytesToWriteToDisk(target->getApproxBytesToWriteToDisk()), - _replay_operation_cost(target->get_replay_operation_cost()), _needUrgentFlush(target->needUrgentFlush()), - _priority(target->getPriority()) -{ } + _approxBytesToWriteToDisk(target->getApproxBytesToWriteToDisk()) +{ + // empty +} + + +uint64_t +CachedFlushTarget::getApproxBytesToWriteToDisk() const +{ + return _approxBytesToWriteToDisk; +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h index a43c7c16217..1bd51e9ab11 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h @@ -19,10 +19,8 @@ private: Time _lastFlushTime; MemoryGain _memoryGain; DiskGain _diskGain; - uint64_t _approxBytesToWriteToDisk; - double _replay_operation_cost; bool _needUrgentFlush; - Priority _priority; + uint64_t _approxBytesToWriteToDisk; public: /** @@ -49,15 +47,13 @@ public: SerialNum getFlushedSerialNum() const override { return _flushedSerialNum; } Time getLastFlushTime() const override { return _lastFlushTime; } bool needUrgentFlush() const override { return _needUrgentFlush; } - Priority getPriority() const override { return _priority; } - double get_replay_operation_cost() const override { return _replay_operation_cost; } Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override { return _target->initFlush(currentSerial, std::move(flush_token)); } FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } - uint64_t getApproxBytesToWriteToDisk() const override { return _approxBytesToWriteToDisk; } + uint64_t getApproxBytesToWriteToDisk() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 772216e7f37..3e577bf6cbe 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -128,7 +128,8 @@ FlushEngine::close() if (_thread.joinable()) { _thread.join(); } - _executor.shutdown().sync(); + _executor.shutdown(); + _executor.sync(); return *this; } @@ -157,61 +158,36 @@ bool FlushEngine::wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune) { std::unique_lock<std::mutex> guard(_lock); - if (canFlushMore(guard) && _pendingPrune.empty()) { + if ( (minimumWaitTimeIfReady != vespalib::duration::zero()) && canFlushMore(guard) && _pendingPrune.empty()) { _cond.wait_for(guard, minimumWaitTimeIfReady); } while ( ! canFlushMore(guard) && ( ignorePendingPrune || _pendingPrune.empty())) { _cond.wait_for(guard, 1s); // broadcast when flush done } - return !_closed.load(std::memory_order_relaxed); -} - -void -FlushEngine::wait_for_slot(IFlushTarget::Priority priority) -{ - (void) priority; - std::unique_lock<std::mutex> guard(_lock); - while ( ! canFlushMore(guard) && _pendingPrune.empty()) { - _cond.wait_for(guard, 1s); // broadcast when flush done - } -} - -std::pair<bool, vespalib::string> -FlushEngine::checkAndFlush(vespalib::string prev) { - std::pair<FlushContext::List, bool> lst = getSortedTargetList(); - if (lst.second) { - // Everything returned from a priority strategy should be flushed - flushAll(lst.first); - } else if ( ! lst.first.empty()) { - wait_for_slot(lst.first[0]->getTarget()->getPriority()); - prev = flushNextTarget(prev, lst.first); - if (!prev.empty()) { - // Sleep 1 ms after a successful flush in order to avoid busy loop in case - // of strategy or target error. - std::this_thread::sleep_for(1ms); - return {false, prev}; - } - } - return {true, ""}; + return !_closed; } void FlushEngine::run() { _has_thread = true; + bool shouldIdle = false; vespalib::string prevFlushName; - for (vespalib::duration idleInterval=vespalib::duration::zero(); !_closed.load(std::memory_order_relaxed); idleInterval = _idleInterval) { - LOG(debug, "Making another check for something to flush, last was '%s'", prevFlushName.c_str()); + while (wait(shouldIdle ? _idleInterval : vespalib::duration::zero(), false)) { + shouldIdle = false; if (prune()) { - // Prune attempted on one or more handlers - wait_for_slot(IFlushTarget::Priority::NORMAL); + continue; // Prune attempted on one or more handlers + } + prevFlushName = flushNextTarget(prevFlushName); + if ( ! prevFlushName.empty()) { + // Sleep 1 ms after a successful flush in order to avoid busy loop in case + // of strategy or target error. + std::this_thread::sleep_for(1ms); } else { - auto [needWait, name] = checkAndFlush(prevFlushName); - prevFlushName = name; - if (needWait) { - wait(idleInterval); - } + shouldIdle = true; } + LOG(debug, "Making another wait(idle=%s, timeS=%1.3f) last was '%s'", + shouldIdle ? "true" : "false", shouldIdle ? vespalib::to_s(_idleInterval) : 0, prevFlushName.c_str()); } _executor.sync(); prune(); @@ -361,21 +337,27 @@ FlushEngine::flushAll(const FlushContext::List &lst) } } } - _executor.sync(); - prune(); - std::lock_guard<std::mutex> strategyGuard(_strategyLock); - _priorityStrategy.reset(); - _strategyCond.notify_all(); } vespalib::string -FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts) +FlushEngine::flushNextTarget(const vespalib::string & name) { - if (contexts.empty()) { + std::pair<FlushContext::List,bool> lst = getSortedTargetList(); + if (lst.second) { + // Everything returned from a priority strategy should be flushed + flushAll(lst.first); + _executor.sync(); + prune(); + std::lock_guard<std::mutex> strategyGuard(_strategyLock); + _priorityStrategy.reset(); + _strategyCond.notify_all(); + return ""; + } + if (lst.first.empty()) { LOG(debug, "No target to flush."); return ""; } - FlushContext::SP ctx = initNextFlush(contexts); + FlushContext::SP ctx = initNextFlush(lst.first); if ( ! ctx) { LOG(debug, "All targets refused to flush."); return ""; @@ -383,7 +365,7 @@ FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext:: if ( name == ctx->getName()) { LOG(info, "The same target %s out of %ld has been asked to flush again. " "This might indicate flush logic flaw so I will wait 100 ms before doing it.", - name.c_str(), contexts.size()); + name.c_str(), lst.first.size()); std::this_thread::sleep_for(100ms); } _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); @@ -463,6 +445,7 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP { std::lock_guard<std::mutex> guard(_lock); taskId = _taskId++; + vespalib::string name(FlushContext::createName(*handler, *target)); FlushInfo flush(taskId, handler->getName(), target); _flushing[taskId] = flush; } @@ -476,7 +459,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy) { std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock); std::unique_lock<std::mutex> strategyGuard(_strategyLock); - if (_closed.load(std::memory_order_relaxed)) { + if (_closed) { return; } assert(!_priorityStrategy); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 06ce34bb831..1d6ed763ff6 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -50,7 +50,7 @@ private: }; using FlushMap = std::map<uint32_t, FlushInfo>; using FlushHandlerMap = HandlerMap<IFlushHandler>; - std::atomic<bool> _closed; + bool _closed; const uint32_t _maxConcurrent; const vespalib::duration _idleInterval; uint32_t _taskId; @@ -75,20 +75,15 @@ private: std::pair<FlushContext::List,bool> getSortedTargetList(); std::shared_ptr<search::IFlushToken> get_flush_token(const FlushContext& ctx); FlushContext::SP initNextFlush(const FlushContext::List &lst); - vespalib::string flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts); + vespalib::string flushNextTarget(const vespalib::string & name); void flushAll(const FlushContext::List &lst); bool prune(); uint32_t initFlush(const FlushContext &ctx); uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target); void flushDone(const FlushContext &ctx, uint32_t taskId); bool canFlushMore(const std::unique_lock<std::mutex> &guard) const; - void wait_for_slot(IFlushTarget::Priority priority); bool wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune); - void wait(vespalib::duration minimumWaitTimeIfReady) { - wait(minimumWaitTimeIfReady, false); - } bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const; - std::pair<bool, vespalib::string> checkAndFlush(vespalib::string prev); friend class FlushTask; friend class FlushEngineExplorer; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp index b62a9191625..eb35e2b2eb1 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp @@ -22,10 +22,61 @@ FlushTargetProxy::FlushTargetProxy(const IFlushTarget::SP &target, { } + +IFlushTarget::MemoryGain +FlushTargetProxy::getApproxMemoryGain() const +{ + return _target->getApproxMemoryGain(); +} + + +IFlushTarget::DiskGain +FlushTargetProxy::getApproxDiskGain() const +{ + return _target->getApproxDiskGain(); +} + + +IFlushTarget::SerialNum +FlushTargetProxy::getFlushedSerialNum() const +{ + return _target->getFlushedSerialNum(); +} + + +IFlushTarget::Time +FlushTargetProxy::getLastFlushTime() const +{ + return _target->getLastFlushTime(); +} + + +bool +FlushTargetProxy::needUrgentFlush() const +{ + return _target->needUrgentFlush(); +} + + IFlushTarget::Task::UP FlushTargetProxy::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) { return _target->initFlush(currentSerial, std::move(flush_token)); } + +FlushStats +FlushTargetProxy::getLastFlushStats() const +{ + return _target->getLastFlushStats(); +} + + +uint64_t +FlushTargetProxy::getApproxBytesToWriteToDisk() const +{ + return _target->getApproxBytesToWriteToDisk(); +} + + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h index e2ac832e023..0967ca1d6c1 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h @@ -41,16 +41,14 @@ public: */ const IFlushTarget::SP & getFlushTarget() const { return _target; } // Implements IFlushTarget. - MemoryGain getApproxMemoryGain() const override { return _target->getApproxMemoryGain(); } - DiskGain getApproxDiskGain() const override { return _target->getApproxDiskGain(); } - SerialNum getFlushedSerialNum() const override { return _target->getFlushedSerialNum(); } - Time getLastFlushTime() const override { return _target->getLastFlushTime(); } - bool needUrgentFlush() const override { return _target->needUrgentFlush(); } - Priority getPriority() const override { return _target->getPriority(); } - uint64_t getApproxBytesToWriteToDisk() const override { return _target->getApproxBytesToWriteToDisk(); } - searchcorespi::FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } - double get_replay_operation_cost() const override { return _target->get_replay_operation_cost(); } + MemoryGain getApproxMemoryGain() const override; + DiskGain getApproxDiskGain() const override; + SerialNum getFlushedSerialNum() const override; + Time getLastFlushTime() const override; + bool needUrgentFlush() const override; Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; + searchcorespi::FlushStats getLastFlushStats() const override; + uint64_t getApproxBytesToWriteToDisk() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp index d292f8347a7..b824f2a7c88 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp @@ -6,7 +6,6 @@ namespace proton { using searchcorespi::IFlushTarget; -using searchcorespi::LeafFlushTarget; using searchcorespi::FlushStats; using searchcorespi::FlushTask; @@ -47,7 +46,7 @@ ShrinkLidSpaceFlushTarget::ShrinkLidSpaceFlushTarget(const vespalib::string &nam SerialNum flushedSerialNum, Time lastFlushTime, std::shared_ptr<ICompactableLidSpace> target) - : LeafFlushTarget(name, type, component), + : IFlushTarget(name, type, component), _target(std::move(target)), _flushedSerialNum(flushedSerialNum), @@ -81,6 +80,12 @@ ShrinkLidSpaceFlushTarget::getLastFlushTime() const return _lastFlushTime; } +bool +ShrinkLidSpaceFlushTarget::needUrgentFlush() const +{ + return false; +} + IFlushTarget::Task::UP ShrinkLidSpaceFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) { diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h index 13f221e40a7..e3efbd9b273 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h @@ -11,7 +11,7 @@ namespace proton { /** * Implements a flush target that shrinks lid space in target. */ -class ShrinkLidSpaceFlushTarget : public searchcorespi::LeafFlushTarget +class ShrinkLidSpaceFlushTarget : public searchcorespi::IFlushTarget { /** * Task representing that shrinking has been performed. @@ -46,6 +46,7 @@ public: DiskGain getApproxDiskGain() const override; SerialNum getFlushedSerialNum() const override; Time getLastFlushTime() const override; + bool needUrgentFlush() const override; Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; searchcorespi::FlushStats getLastFlushStats() const override; uint64_t getApproxBytesToWriteToDisk() const override; diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp index 852ed73dc74..5b49c724c6f 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp @@ -30,4 +30,10 @@ JobTrackedFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search return FlushTask::UP(); } +uint64_t +JobTrackedFlushTarget::getApproxBytesToWriteToDisk() const +{ + return _target->getApproxBytesToWriteToDisk(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h index c09c1fac055..35d1b0b0b12 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h @@ -19,7 +19,7 @@ private: public: JobTrackedFlushTarget(std::shared_ptr<IJobTracker> tracker, std::shared_ptr<searchcorespi::IFlushTarget> target); - ~JobTrackedFlushTarget() override; + ~JobTrackedFlushTarget(); const IJobTracker &getTracker() const { return *_tracker; } const searchcorespi::IFlushTarget &getTarget() const { return *_target; } @@ -40,18 +40,13 @@ public: bool needUrgentFlush() const override { return _target->needUrgentFlush(); } - double get_replay_operation_cost() const override { - return _target->get_replay_operation_cost(); - } - Priority getPriority() const override { return _target->getPriority(); } searchcorespi::FlushTask::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; searchcorespi::FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } - uint64_t getApproxBytesToWriteToDisk() const override { - return _target->getApproxBytesToWriteToDisk(); - } + uint64_t getApproxBytesToWriteToDisk() const override; }; } // namespace proton + diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp index 29b31dd2add..8915e3b367c 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp @@ -5,10 +5,10 @@ namespace proton::test { DummyFlushTarget::DummyFlushTarget(const vespalib::string &name) noexcept - : searchcorespi::LeafFlushTarget(name, Type::OTHER, Component::OTHER) + : searchcorespi::IFlushTarget(name) {} DummyFlushTarget::DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept - : searchcorespi::LeafFlushTarget(name, type, component) + : searchcorespi::IFlushTarget(name, type, component) {} DummyFlushTarget::~DummyFlushTarget() = default; diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h index 6b261f7bc4e..a9206233c9d 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h @@ -5,7 +5,7 @@ namespace proton::test { -struct DummyFlushTarget : public searchcorespi::LeafFlushTarget +struct DummyFlushTarget : public searchcorespi::IFlushTarget { DummyFlushTarget(const vespalib::string &name) noexcept; DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept; @@ -14,6 +14,7 @@ struct DummyFlushTarget : public searchcorespi::LeafFlushTarget DiskGain getApproxDiskGain() const override { return DiskGain(0, 0); } SerialNum getFlushedSerialNum() const override { return 0; } Time getLastFlushTime() const override { return Time(); } + bool needUrgentFlush() const override { return false; } searchcorespi::FlushTask::UP initFlush(SerialNum, std::shared_ptr<search::IFlushToken>) override { return searchcorespi::FlushTask::UP(); } diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp index b31113e1abc..d821e06a2a3 100644 --- a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp +++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.cpp @@ -16,8 +16,4 @@ IFlushTarget::IFlushTarget(const vespalib::string &name, const Type &type, const IFlushTarget::~IFlushTarget() = default; -LeafFlushTarget::LeafFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept - : IFlushTarget(name, type, component) -{} - } diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h index 9e960757115..e3ddf98ce9f 100644 --- a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h +++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h @@ -38,11 +38,6 @@ public: OTHER }; - enum class Priority { - NORMAL = 50, - HIGH = 100 - }; - private: vespalib::string _name; Type _type; @@ -138,7 +133,7 @@ public: /** * Return cost of replaying a feed operation relative to cost of reading a feed operation from tls. */ - virtual double get_replay_operation_cost() const = 0; + virtual double get_replay_operation_cost() const { return 0.0; } /** * Returns the last serial number for the transaction applied to @@ -161,10 +156,7 @@ public: * * @return true if an urgent flush is needed */ - virtual bool needUrgentFlush() const = 0; - - /// Returns a priority for this target - virtual Priority getPriority() const = 0; + virtual bool needUrgentFlush() const { return false; } /** * Initiates the flushing of temporary memory. This method must perform @@ -183,14 +175,7 @@ public: * @return The stats for the last flush. */ virtual FlushStats getLastFlushStats() const = 0; -}; -class LeafFlushTarget : public IFlushTarget { -public: - LeafFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept; - bool needUrgentFlush() const override { return false; } - Priority getPriority() const override { return Priority::NORMAL; } - double get_replay_operation_cost() const override { return 0.0; } }; } // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp index b5a5e2c2843..53fb21bf1ed 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -10,7 +10,7 @@ LOG_SETUP(".searchcorespi.index.indexflushtarget"); namespace searchcorespi::index { IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer, IndexMaintainer::FlushStats flushStats) - : LeafFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX), + : IFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX), _indexMaintainer(indexMaintainer), _flushStats(flushStats), _numFrozenMemoryIndexes(indexMaintainer.getNumFrozenMemoryIndexes()), diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h index 9f524bc341d..2b9ecc9574b 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h @@ -9,7 +9,7 @@ namespace searchcorespi::index { /** * Flush target for flushing a memory index in an IndexMaintainer. **/ -class IndexFlushTarget : public LeafFlushTarget { +class IndexFlushTarget : public IFlushTarget { private: IndexMaintainer &_indexMaintainer; const IndexMaintainer::FlushStats _flushStats; @@ -29,7 +29,6 @@ public: Time getLastFlushTime() const override; bool needUrgentFlush() const override; - Priority getPriority() const override { return Priority::HIGH; } Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; FlushStats getLastFlushStats() const override { return _lastStats; } diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp index 562d49a4348..6755976939b 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -38,7 +38,7 @@ public: } IndexFusionTarget::IndexFusionTarget(IndexMaintainer &indexMaintainer) - : LeafFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX), + : IFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX), _indexMaintainer(indexMaintainer), _fusionStats(indexMaintainer.getFusionStats()), _lastStats() diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h index 2be7bcc33a9..7a9f44e6612 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h @@ -9,7 +9,7 @@ namespace searchcorespi::index { /** * Flush target for doing fusion on disk indexes in an IndexMaintainer. **/ -class IndexFusionTarget : public LeafFlushTarget { +class IndexFusionTarget : public IFlushTarget { private: IndexMaintainer &_indexMaintainer; IndexMaintainer::FusionStats _fusionStats; |