diff options
10 files changed, 140 insertions, 65 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java index 69e9a0e8585..2575cdb6237 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java @@ -59,12 +59,14 @@ public class Admin extends AbstractConfigProducer implements Serializable { Monitoring monitoring, Metrics metrics, Map<String, MetricsConsumer> legacyMetricsConsumers, - boolean multitenant) { + boolean multitenant, + FileDistributionConfigProducer fileDistributionConfigProducer) { super(parent, "admin"); this.monitoring = monitoring; this.metrics = metrics; this.legacyMetricsConsumers = legacyMetricsConsumers; this.multitenant = multitenant; + this.fileDistribution = fileDistributionConfigProducer; } public Configserver getConfigserver() { @@ -148,10 +150,6 @@ public class Admin extends AbstractConfigProducer implements Serializable { zooKeepersConfigProvider.getConfig(builder); } - public void setFileDistribution(FileDistributionConfigProducer fileDistribution) { - this.fileDistribution = fileDistribution; - } - public FileDistributionConfigProducer getFileDistributionConfigProducer() { return fileDistribution; } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java index c2ba2efac71..24e56472688 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java @@ -67,25 +67,26 @@ public abstract class DomAdminBuilderBase extends VespaDomBuilder.DomConfigProdu @Override protected Admin doBuild(AbstractConfigProducer parent, Element adminElement) { Monitoring monitoring = getMonitoring(getChildWithFallback(adminElement, "monitoring", "yamas")); - Metrics metrics = new MetricsBuilder(applicationType, predefinedMetricSets) .buildMetrics(XML.getChild(adminElement, "metrics")); Map<String, MetricsConsumer> legacyMetricsConsumers = DomMetricBuilderHelper .buildMetricsConsumers(XML.getChild(adminElement, "metric-consumers")); + FileDistributionConfigProducer fileDistributionConfigProducer = getFileDistributionConfigProducer(parent, adminElement); - Admin admin = new Admin(parent, monitoring, metrics, legacyMetricsConsumers, multitenant); - + Admin admin = new Admin(parent, monitoring, metrics, legacyMetricsConsumers, multitenant, fileDistributionConfigProducer); doBuildAdmin(admin, adminElement); - new ModelConfigProvider(admin); + return admin; + } + + private FileDistributionConfigProducer getFileDistributionConfigProducer(AbstractConfigProducer parent, Element adminElement) { FileDistributionOptions fileDistributionOptions = FileDistributionOptions.defaultOptions(); fileDistributionOptions.disableFiledistributor(disableFiledistributor); fileDistributionOptions = new DomFileDistributionOptionsBuilder(fileDistributionOptions).build(XML.getChild(adminElement, "filedistribution")); - admin.setFileDistribution(new FileDistributionConfigProducer.Builder(fileDistributionOptions).build(parent, fileRegistry)); - return admin; + return new FileDistributionConfigProducer.Builder(fileDistributionOptions).build(parent, fileRegistry); } - + private Element getChildWithFallback(Element parent, String childName, String alternativeChildName) { Element child = XML.getChild(parent, childName); if (child != null) return child; diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java b/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java index 3a2633ed7b7..e557e3674b5 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.model.content.utils; import com.yahoo.config.application.api.ApplicationPackage; import com.yahoo.config.model.ConfigModelContext; import com.yahoo.config.model.api.HostProvisioner; +import com.yahoo.config.model.application.provider.MockFileRegistry; import com.yahoo.config.model.deploy.DeployState; import com.yahoo.config.model.provision.InMemoryProvisioner; import com.yahoo.config.model.provision.SingleNodeProvisioner; @@ -11,9 +12,11 @@ import com.yahoo.config.model.test.MockApplicationPackage; import com.yahoo.config.model.test.MockRoot; import com.yahoo.text.XML; import com.yahoo.vespa.model.admin.Admin; +import com.yahoo.vespa.model.admin.FileDistributionOptions; import com.yahoo.vespa.model.admin.monitoring.DefaultMonitoring; import com.yahoo.vespa.model.admin.monitoring.builder.Metrics; import com.yahoo.vespa.model.content.cluster.ContentCluster; +import com.yahoo.vespa.model.filedistribution.FileDistributionConfigProducer; import org.w3c.dom.Document; import java.util.Collections; @@ -57,7 +60,8 @@ public class ContentClusterUtils { public static ContentCluster createCluster(String clusterXml, MockRoot root) throws Exception { Document doc = XML.getDocument(clusterXml); - Admin admin = new Admin(root, new DefaultMonitoring("vespa", 60), new Metrics(), Collections.emptyMap(), false); + Admin admin = new Admin(root, new DefaultMonitoring("vespa", 60), new Metrics(), Collections.emptyMap(), false, + new FileDistributionConfigProducer.Builder(FileDistributionOptions.defaultOptions()).build(root, new MockFileRegistry())); ConfigModelContext context = ConfigModelContext.create(null, root.getDeployState(), null, root, null); return new ContentCluster.Builder(admin).build(Collections.emptyList(), context, doc.getDocumentElement()); diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp index 5e8537b657f..985a970a380 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp @@ -13,6 +13,7 @@ #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/test/insertion_operators.h> +#include <chrono> #include <vespa/log/log.h> LOG_SETUP("flushengine_test"); @@ -393,13 +394,12 @@ public: } }; -class ConstantFlushStrategy : public SimpleStrategy { -public: - uint64_t _millis; - -public: - ConstantFlushStrategy(uint64_t millis) : SimpleStrategy(), _millis(millis) { } - typedef std::shared_ptr<ConstantFlushStrategy> SP; +class NoFlushStrategy : public SimpleStrategy +{ + virtual FlushContext::List getFlushTargets(const FlushContext::List &, + const flushengine::TlsStatsMap &) const override { + return FlushContext::List(); + } }; // -------------------------------------------------------------------------------- @@ -437,12 +437,38 @@ struct Fixture SimpleStrategy::SP strategy; FlushEngine engine; - Fixture(uint32_t numThreads, uint32_t idleIntervalMS) + Fixture(uint32_t numThreads, uint32_t idleIntervalMS, SimpleStrategy::SP strategy_) : tlsStatsFactory(std::make_shared<SimpleTlsStatsFactory>()), - strategy(std::make_shared<SimpleStrategy>()), + strategy(strategy_), engine(tlsStatsFactory, strategy, numThreads, idleIntervalMS) { } + + Fixture(uint32_t numThreads, uint32_t idleIntervalMS) + : Fixture(numThreads, idleIntervalMS, std::make_shared<SimpleStrategy>()) + { + } + + std::shared_ptr<SimpleHandler> + addSimpleHandler(Targets targets) + { + auto handler = std::make_shared<SimpleHandler>(targets, "handler", 20); + engine.putFlushHandler(DocTypeName("handler"), handler); + engine.start(); + return handler; + } + + void assertOldestSerial(SimpleHandler &handler, search::SerialNum expOldestSerial) + { + using namespace std::chrono_literals; + for (int pass = 0; pass < 600; ++pass) { + std::this_thread::sleep_for(100ms); + if (handler._oldestSerial == expOldestSerial) { + break; + } + } + EXPECT_EQUAL(expOldestSerial, handler._oldestSerial); + } }; @@ -717,6 +743,26 @@ TEST_F("require that state explorer can list flush targets", Fixture(1, 1)) target->_taskDone.await(LONG_TIMEOUT); } +TEST_F("require that oldest serial is updated when closing engine", Fixture(1, 100)) +{ + auto target1 = std::make_shared<SimpleTarget>("target1", 10, false); + auto handler = f.addSimpleHandler({ target1 }); + TEST_DO(f.assertOldestSerial(*handler, 10)); + target1->_proceed.countDown(); + f.engine.close(); + EXPECT_EQUAL(20u, handler->_oldestSerial); +} + +TEST_F("require that oldest serial is updated when finishing priority flush strategy", Fixture(1, 100, std::make_shared<NoFlushStrategy>())) +{ + auto target1 = std::make_shared<SimpleTarget>("target1", 10, true); + auto handler = f.addSimpleHandler({ target1 }); + TEST_DO(f.assertOldestSerial(*handler, 10)); + f.engine.setStrategy(std::make_shared<SimpleStrategy>()); + EXPECT_EQUAL(20u, handler->_oldestSerial); +} + + TEST_MAIN() { TEST_RUN_ALL(); diff --git a/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp b/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp index 49f70092d29..f3afab96cf5 100644 --- a/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp @@ -303,4 +303,18 @@ EventLogger::loadDocumentStoreComplete(const vespalib::string &subDbName, int64_ loadComponentComplete(subDbName, "documentstore", elapsedTimeMs); } +void +EventLogger::transactionLogPruneComplete(const string &domainName, SerialNum prunedSerial) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("domain").appendString(domainName); + jstr.appendKey("serialnum") + .beginObject() + .appendKey("pruned").appendInt64(prunedSerial) + .endObject(); + jstr.endObject(); + EV_STATE("transactionlog.prune.complete", jstr.toString().c_str()); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/eventlogger.h b/searchcore/src/vespa/searchcore/proton/common/eventlogger.h index ab3793eb677..6ba8852496e 100644 --- a/searchcore/src/vespa/searchcore/proton/common/eventlogger.h +++ b/searchcore/src/vespa/searchcore/proton/common/eventlogger.h @@ -50,6 +50,7 @@ public: static void loadDocumentMetaStoreComplete(const vespalib::string &subDbName, int64_t elapsedTimeMs); static void loadDocumentStoreStart(const vespalib::string &subDbName); static void loadDocumentStoreComplete(const vespalib::string &subDbName, int64_t elapsedTimeMs); + static void transactionLogPruneComplete(const string &domainName, SerialNum prunedSerial); }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index fb25f8cf161..95b3008985b 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -12,10 +12,10 @@ #include <vespa/log/log.h> LOG_SETUP(".proton.flushengine.flushengine"); -using vespalib::MonitorGuard; typedef vespalib::Executor::Task Task; using searchcorespi::IFlushTarget; using searchcorespi::FlushStats; +using namespace std::chrono_literals; namespace proton { @@ -71,11 +71,13 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> _strategy(strategy), _priorityStrategy(), _executor(numThreads, 128 * 1024), - _monitor(), + _lock(), + _cond(), _handlers(), _flushing(), + _setStrategyLock(), _strategyLock(), - _strategyMonitor(), + _strategyCond(), _tlsStatsFactory(tlsStatsFactory), _pendingPrune() { @@ -100,10 +102,10 @@ FlushEngine & FlushEngine::close() { { - MonitorGuard strategyGuard(_strategyMonitor); - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> strategyGuard(_strategyLock); + std::lock_guard<std::mutex> guard(_lock); _closed = true; - guard.broadcast(); + _cond.notify_all(); } _threadPool.Close(); _executor.shutdown(); @@ -120,13 +122,13 @@ FlushEngine::triggerFlush() void FlushEngine::kick() { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); LOG(debug, "Kicking flush engine"); - guard.broadcast(); + _cond.notify_all(); } bool -FlushEngine::canFlushMore(const MonitorGuard & guard) const +FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &guard) const { (void) guard; return _maxConcurrent > _flushing.size(); @@ -135,12 +137,12 @@ FlushEngine::canFlushMore(const MonitorGuard & guard) const bool FlushEngine::wait(size_t minimumWaitTimeIfReady) { - MonitorGuard guard(_monitor); + std::unique_lock<std::mutex> guard(_lock); if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard) && _pendingPrune.empty()) { - guard.wait(minimumWaitTimeIfReady); + _cond.wait_for(guard, std::chrono::milliseconds(minimumWaitTimeIfReady)); } while ( ! canFlushMore(guard) && _pendingPrune.empty()) { - guard.wait(1000); // broadcast when flush done + _cond.wait_for(guard, 1s); // broadcast when flush done } return !_closed; } @@ -167,6 +169,8 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg) } LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str()); } + _executor.sync(); + prune(); } bool @@ -174,7 +178,7 @@ FlushEngine::prune() { std::set<IFlushHandler::SP> toPrune; { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); if (_pendingPrune.empty()) { return false; } @@ -187,7 +191,7 @@ FlushEngine::prune() return true; } -bool FlushEngine::isFlushing(const MonitorGuard & guard, const vespalib::string & name) const +bool FlushEngine::isFlushing(const std::lock_guard<std::mutex> & guard, const vespalib::string & name) const { (void) guard; for(const auto & it : _flushing) { @@ -203,7 +207,7 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const { FlushContext::List ret; { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); for (const auto & it : _handlers) { IFlushHandler & handler(*it.second); search::SerialNum serial(handler.getCurrentSerialNumber()); @@ -227,12 +231,12 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const } std::pair<FlushContext::List,bool> -FlushEngine::getSortedTargetList(MonitorGuard &strategyGuard) const +FlushEngine::getSortedTargetList() { - (void) strategyGuard; FlushContext::List unsortedTargets = getTargetList(false); - std::pair<FlushContext::List, bool> ret; flushengine::TlsStatsMap tlsStatsMap(_tlsStatsFactory->create()); + std::lock_guard<std::mutex> strategyGuard(_strategyLock); + std::pair<FlushContext::List, bool> ret; if (_priorityStrategy) { ret = std::make_pair(_priorityStrategy->getFlushTargets(unsortedTargets, tlsStatsMap), true); } else { @@ -291,14 +295,15 @@ FlushEngine::flushAll(const FlushContext::List &lst) vespalib::string FlushEngine::flushNextTarget(const vespalib::string & name) { - MonitorGuard strategyGuard(_strategyMonitor); - std::pair<FlushContext::List,bool> lst = getSortedTargetList(strategyGuard); + std::pair<FlushContext::List,bool> lst = getSortedTargetList(); if (lst.second) { // Everything returned from a priority strategy should be flushed flushAll(lst.first); _executor.sync(); + prune(); + std::lock_guard<std::mutex> strategyGuard(_strategyLock); _priorityStrategy.reset(); - strategyGuard.broadcast(); + _strategyCond.notify_all(); return ""; } if (lst.first.empty()) { @@ -340,7 +345,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) { fastos::TimeStamp duration; { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); duration = fastos::TimeStamp(fastos::ClockSystem::now()) - _flushing[taskId].getStart(); } if (LOG_WOULD_LOG(event)) { @@ -351,20 +356,20 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) stats.getPathElementsToLog()); } LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec()); - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); _flushing.erase(taskId); assert(ctx.getHandler()); if (_handlers.hasHandler(ctx.getHandler())) { _pendingPrune.insert(ctx.getHandler()); } - guard.broadcast(); + _cond.notify_all(); } IFlushHandler::SP FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler) { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler)); if (result) { _pendingPrune.erase(result); @@ -376,14 +381,14 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, IFlushHandler::SP FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); return _handlers.getHandler(docTypeName); } IFlushHandler::SP FlushEngine::removeFlushHandler(const DocTypeName &docTypeName) { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); IFlushHandler::SP result(_handlers.removeHandler(docTypeName)); _pendingPrune.erase(result); return std::move(result); @@ -393,7 +398,7 @@ FlushEngine::FlushMetaSet FlushEngine::getCurrentlyFlushingSet() const { FlushMetaSet s; - vespalib::LockGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); for (const auto & it : _flushing) { s.insert(it.second); } @@ -405,7 +410,7 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP { uint32_t taskId(0); { - vespalib::LockGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); taskId = _taskId++; vespalib::string name(FlushContext::createName(*handler, *target)); FlushInfo flush(taskId, target, name); @@ -419,19 +424,19 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP void FlushEngine::setStrategy(IFlushStrategy::SP strategy) { - vespalib::LockGuard strategyLock(_strategyLock); - MonitorGuard strategyGuard(_strategyMonitor); + std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock); + std::unique_lock<std::mutex> strategyGuard(_strategyLock); if (_closed) { return; } assert(!_priorityStrategy); _priorityStrategy = strategy; { - MonitorGuard guard(_monitor); - guard.broadcast(); + std::lock_guard<std::mutex> guard(_lock); + _cond.notify_all(); } while (_priorityStrategy) { - strategyGuard.wait(); + _strategyCond.wait(strategyGuard); } } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 7f4698610c8..19175f9ce2a 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -5,10 +5,11 @@ #include "iflushstrategy.h" #include <vespa/searchcore/proton/common/handlermap.hpp> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fastos/thread.h> #include <set> +#include <mutex> +#include <condition_variable> namespace proton { @@ -53,16 +54,18 @@ private: IFlushStrategy::SP _strategy; mutable IFlushStrategy::SP _priorityStrategy; vespalib::ThreadStackExecutor _executor; - vespalib::Monitor _monitor; + mutable std::mutex _lock; + std::condition_variable _cond; FlushHandlerMap _handlers; FlushMap _flushing; - vespalib::Lock _strategyLock; // serialize setStrategy calls - vespalib::Monitor _strategyMonitor; + std::mutex _setStrategyLock; // serialize setStrategy calls + std::mutex _strategyLock; + std::condition_variable _strategyCond; std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory; std::set<IFlushHandler::SP> _pendingPrune; FlushContext::List getTargetList(bool includeFlushingTargets) const; - std::pair<FlushContext::List,bool> getSortedTargetList(vespalib::MonitorGuard &strategyGuard) const; + std::pair<FlushContext::List,bool> getSortedTargetList(); FlushContext::SP initNextFlush(const FlushContext::List &lst); vespalib::string flushNextTarget(const vespalib::string & name); void flushAll(const FlushContext::List &lst); @@ -70,9 +73,9 @@ private: uint32_t initFlush(const FlushContext &ctx); uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target); void flushDone(const FlushContext &ctx, uint32_t taskId); - bool canFlushMore(const vespalib::MonitorGuard & guard) const; + bool canFlushMore(const std::unique_lock<std::mutex> &guard) const; bool wait(size_t minimumWaitTimeIfReady); - bool isFlushing(const vespalib::MonitorGuard & guard, const vespalib::string & name) const; + bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const; friend class FlushTask; friend class FlushEngineExplorer; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h index 421b176bfec..3a24330f8ec 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h @@ -65,7 +65,7 @@ public: /** * This method is called after a flush has been completed. All transactions * up to the given serial number can be pruned from the domain of this - * handler. This method is called by an arbitrary worker thread. + * handler. This method is called by the flush scheduler thread. * * @param flushedSerial Serial number flushed for all flush * targets belonging to this handler. diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index bc1715e3c04..2d8368e091b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -11,6 +11,7 @@ #include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> +#include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/exceptions.h> @@ -261,6 +262,7 @@ FeedHandler::performPrune(SerialNum flushedSerial) tlsPrune(flushedSerial); // throws on error LOG(debug, "Pruned TLS to token %" PRIu64 ".", flushedSerial); _owner.onPerformPrune(flushedSerial); + EventLogger::transactionLogPruneComplete(_tlsMgr.getDomainName(), flushedSerial); } catch (const IllegalStateException & e) { LOG(warning, "FeedHandler::performPrune failed due to '%s'.", e.what()); } @@ -390,8 +392,9 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu void FeedHandler::flushDone(SerialNum flushedSerial) { - // Called by flush worker thread after performing a flush task + // Called by flush scheduler thread after flush worker thread has completed a flush task _writeService.master().execute(makeLambdaTask([this, flushedSerial]() { performFlushDone(flushedSerial); })); + _writeService.master().sync(); } |