aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java8
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java15
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java6
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine.cpp64
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/eventlogger.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp73
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h17
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp5
10 files changed, 140 insertions, 65 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java
index 69e9a0e8585..2575cdb6237 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java
@@ -59,12 +59,14 @@ public class Admin extends AbstractConfigProducer implements Serializable {
Monitoring monitoring,
Metrics metrics,
Map<String, MetricsConsumer> legacyMetricsConsumers,
- boolean multitenant) {
+ boolean multitenant,
+ FileDistributionConfigProducer fileDistributionConfigProducer) {
super(parent, "admin");
this.monitoring = monitoring;
this.metrics = metrics;
this.legacyMetricsConsumers = legacyMetricsConsumers;
this.multitenant = multitenant;
+ this.fileDistribution = fileDistributionConfigProducer;
}
public Configserver getConfigserver() {
@@ -148,10 +150,6 @@ public class Admin extends AbstractConfigProducer implements Serializable {
zooKeepersConfigProvider.getConfig(builder);
}
- public void setFileDistribution(FileDistributionConfigProducer fileDistribution) {
- this.fileDistribution = fileDistribution;
- }
-
public FileDistributionConfigProducer getFileDistributionConfigProducer() {
return fileDistribution;
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java
index c2ba2efac71..24e56472688 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomAdminBuilderBase.java
@@ -67,25 +67,26 @@ public abstract class DomAdminBuilderBase extends VespaDomBuilder.DomConfigProdu
@Override
protected Admin doBuild(AbstractConfigProducer parent, Element adminElement) {
Monitoring monitoring = getMonitoring(getChildWithFallback(adminElement, "monitoring", "yamas"));
-
Metrics metrics = new MetricsBuilder(applicationType, predefinedMetricSets)
.buildMetrics(XML.getChild(adminElement, "metrics"));
Map<String, MetricsConsumer> legacyMetricsConsumers = DomMetricBuilderHelper
.buildMetricsConsumers(XML.getChild(adminElement, "metric-consumers"));
+ FileDistributionConfigProducer fileDistributionConfigProducer = getFileDistributionConfigProducer(parent, adminElement);
- Admin admin = new Admin(parent, monitoring, metrics, legacyMetricsConsumers, multitenant);
-
+ Admin admin = new Admin(parent, monitoring, metrics, legacyMetricsConsumers, multitenant, fileDistributionConfigProducer);
doBuildAdmin(admin, adminElement);
-
new ModelConfigProvider(admin);
+ return admin;
+ }
+
+ private FileDistributionConfigProducer getFileDistributionConfigProducer(AbstractConfigProducer parent, Element adminElement) {
FileDistributionOptions fileDistributionOptions = FileDistributionOptions.defaultOptions();
fileDistributionOptions.disableFiledistributor(disableFiledistributor);
fileDistributionOptions = new DomFileDistributionOptionsBuilder(fileDistributionOptions).build(XML.getChild(adminElement, "filedistribution"));
- admin.setFileDistribution(new FileDistributionConfigProducer.Builder(fileDistributionOptions).build(parent, fileRegistry));
- return admin;
+ return new FileDistributionConfigProducer.Builder(fileDistributionOptions).build(parent, fileRegistry);
}
-
+
private Element getChildWithFallback(Element parent, String childName, String alternativeChildName) {
Element child = XML.getChild(parent, childName);
if (child != null) return child;
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java b/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java
index 3a2633ed7b7..e557e3674b5 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/utils/ContentClusterUtils.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.model.content.utils;
import com.yahoo.config.application.api.ApplicationPackage;
import com.yahoo.config.model.ConfigModelContext;
import com.yahoo.config.model.api.HostProvisioner;
+import com.yahoo.config.model.application.provider.MockFileRegistry;
import com.yahoo.config.model.deploy.DeployState;
import com.yahoo.config.model.provision.InMemoryProvisioner;
import com.yahoo.config.model.provision.SingleNodeProvisioner;
@@ -11,9 +12,11 @@ import com.yahoo.config.model.test.MockApplicationPackage;
import com.yahoo.config.model.test.MockRoot;
import com.yahoo.text.XML;
import com.yahoo.vespa.model.admin.Admin;
+import com.yahoo.vespa.model.admin.FileDistributionOptions;
import com.yahoo.vespa.model.admin.monitoring.DefaultMonitoring;
import com.yahoo.vespa.model.admin.monitoring.builder.Metrics;
import com.yahoo.vespa.model.content.cluster.ContentCluster;
+import com.yahoo.vespa.model.filedistribution.FileDistributionConfigProducer;
import org.w3c.dom.Document;
import java.util.Collections;
@@ -57,7 +60,8 @@ public class ContentClusterUtils {
public static ContentCluster createCluster(String clusterXml, MockRoot root) throws Exception {
Document doc = XML.getDocument(clusterXml);
- Admin admin = new Admin(root, new DefaultMonitoring("vespa", 60), new Metrics(), Collections.emptyMap(), false);
+ Admin admin = new Admin(root, new DefaultMonitoring("vespa", 60), new Metrics(), Collections.emptyMap(), false,
+ new FileDistributionConfigProducer.Builder(FileDistributionOptions.defaultOptions()).build(root, new MockFileRegistry()));
ConfigModelContext context = ConfigModelContext.create(null, root.getDeployState(), null, root, null);
return new ContentCluster.Builder(admin).build(Collections.emptyList(), context, doc.getDocumentElement());
diff --git a/searchcore/src/tests/proton/flushengine/flushengine.cpp b/searchcore/src/tests/proton/flushengine/flushengine.cpp
index 5e8537b657f..985a970a380 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine.cpp
@@ -13,6 +13,7 @@
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/test/insertion_operators.h>
+#include <chrono>
#include <vespa/log/log.h>
LOG_SETUP("flushengine_test");
@@ -393,13 +394,12 @@ public:
}
};
-class ConstantFlushStrategy : public SimpleStrategy {
-public:
- uint64_t _millis;
-
-public:
- ConstantFlushStrategy(uint64_t millis) : SimpleStrategy(), _millis(millis) { }
- typedef std::shared_ptr<ConstantFlushStrategy> SP;
+class NoFlushStrategy : public SimpleStrategy
+{
+ virtual FlushContext::List getFlushTargets(const FlushContext::List &,
+ const flushengine::TlsStatsMap &) const override {
+ return FlushContext::List();
+ }
};
// --------------------------------------------------------------------------------
@@ -437,12 +437,38 @@ struct Fixture
SimpleStrategy::SP strategy;
FlushEngine engine;
- Fixture(uint32_t numThreads, uint32_t idleIntervalMS)
+ Fixture(uint32_t numThreads, uint32_t idleIntervalMS, SimpleStrategy::SP strategy_)
: tlsStatsFactory(std::make_shared<SimpleTlsStatsFactory>()),
- strategy(std::make_shared<SimpleStrategy>()),
+ strategy(strategy_),
engine(tlsStatsFactory, strategy, numThreads, idleIntervalMS)
{
}
+
+ Fixture(uint32_t numThreads, uint32_t idleIntervalMS)
+ : Fixture(numThreads, idleIntervalMS, std::make_shared<SimpleStrategy>())
+ {
+ }
+
+ std::shared_ptr<SimpleHandler>
+ addSimpleHandler(Targets targets)
+ {
+ auto handler = std::make_shared<SimpleHandler>(targets, "handler", 20);
+ engine.putFlushHandler(DocTypeName("handler"), handler);
+ engine.start();
+ return handler;
+ }
+
+ void assertOldestSerial(SimpleHandler &handler, search::SerialNum expOldestSerial)
+ {
+ using namespace std::chrono_literals;
+ for (int pass = 0; pass < 600; ++pass) {
+ std::this_thread::sleep_for(100ms);
+ if (handler._oldestSerial == expOldestSerial) {
+ break;
+ }
+ }
+ EXPECT_EQUAL(expOldestSerial, handler._oldestSerial);
+ }
};
@@ -717,6 +743,26 @@ TEST_F("require that state explorer can list flush targets", Fixture(1, 1))
target->_taskDone.await(LONG_TIMEOUT);
}
+TEST_F("require that oldest serial is updated when closing engine", Fixture(1, 100))
+{
+ auto target1 = std::make_shared<SimpleTarget>("target1", 10, false);
+ auto handler = f.addSimpleHandler({ target1 });
+ TEST_DO(f.assertOldestSerial(*handler, 10));
+ target1->_proceed.countDown();
+ f.engine.close();
+ EXPECT_EQUAL(20u, handler->_oldestSerial);
+}
+
+TEST_F("require that oldest serial is updated when finishing priority flush strategy", Fixture(1, 100, std::make_shared<NoFlushStrategy>()))
+{
+ auto target1 = std::make_shared<SimpleTarget>("target1", 10, true);
+ auto handler = f.addSimpleHandler({ target1 });
+ TEST_DO(f.assertOldestSerial(*handler, 10));
+ f.engine.setStrategy(std::make_shared<SimpleStrategy>());
+ EXPECT_EQUAL(20u, handler->_oldestSerial);
+}
+
+
TEST_MAIN()
{
TEST_RUN_ALL();
diff --git a/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp b/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp
index 49f70092d29..f3afab96cf5 100644
--- a/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/eventlogger.cpp
@@ -303,4 +303,18 @@ EventLogger::loadDocumentStoreComplete(const vespalib::string &subDbName, int64_
loadComponentComplete(subDbName, "documentstore", elapsedTimeMs);
}
+void
+EventLogger::transactionLogPruneComplete(const string &domainName, SerialNum prunedSerial)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("domain").appendString(domainName);
+ jstr.appendKey("serialnum")
+ .beginObject()
+ .appendKey("pruned").appendInt64(prunedSerial)
+ .endObject();
+ jstr.endObject();
+ EV_STATE("transactionlog.prune.complete", jstr.toString().c_str());
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/common/eventlogger.h b/searchcore/src/vespa/searchcore/proton/common/eventlogger.h
index ab3793eb677..6ba8852496e 100644
--- a/searchcore/src/vespa/searchcore/proton/common/eventlogger.h
+++ b/searchcore/src/vespa/searchcore/proton/common/eventlogger.h
@@ -50,6 +50,7 @@ public:
static void loadDocumentMetaStoreComplete(const vespalib::string &subDbName, int64_t elapsedTimeMs);
static void loadDocumentStoreStart(const vespalib::string &subDbName);
static void loadDocumentStoreComplete(const vespalib::string &subDbName, int64_t elapsedTimeMs);
+ static void transactionLogPruneComplete(const string &domainName, SerialNum prunedSerial);
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index fb25f8cf161..95b3008985b 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -12,10 +12,10 @@
#include <vespa/log/log.h>
LOG_SETUP(".proton.flushengine.flushengine");
-using vespalib::MonitorGuard;
typedef vespalib::Executor::Task Task;
using searchcorespi::IFlushTarget;
using searchcorespi::FlushStats;
+using namespace std::chrono_literals;
namespace proton {
@@ -71,11 +71,13 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
_strategy(strategy),
_priorityStrategy(),
_executor(numThreads, 128 * 1024),
- _monitor(),
+ _lock(),
+ _cond(),
_handlers(),
_flushing(),
+ _setStrategyLock(),
_strategyLock(),
- _strategyMonitor(),
+ _strategyCond(),
_tlsStatsFactory(tlsStatsFactory),
_pendingPrune()
{
@@ -100,10 +102,10 @@ FlushEngine &
FlushEngine::close()
{
{
- MonitorGuard strategyGuard(_strategyMonitor);
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> strategyGuard(_strategyLock);
+ std::lock_guard<std::mutex> guard(_lock);
_closed = true;
- guard.broadcast();
+ _cond.notify_all();
}
_threadPool.Close();
_executor.shutdown();
@@ -120,13 +122,13 @@ FlushEngine::triggerFlush()
void
FlushEngine::kick()
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
LOG(debug, "Kicking flush engine");
- guard.broadcast();
+ _cond.notify_all();
}
bool
-FlushEngine::canFlushMore(const MonitorGuard & guard) const
+FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &guard) const
{
(void) guard;
return _maxConcurrent > _flushing.size();
@@ -135,12 +137,12 @@ FlushEngine::canFlushMore(const MonitorGuard & guard) const
bool
FlushEngine::wait(size_t minimumWaitTimeIfReady)
{
- MonitorGuard guard(_monitor);
+ std::unique_lock<std::mutex> guard(_lock);
if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard) && _pendingPrune.empty()) {
- guard.wait(minimumWaitTimeIfReady);
+ _cond.wait_for(guard, std::chrono::milliseconds(minimumWaitTimeIfReady));
}
while ( ! canFlushMore(guard) && _pendingPrune.empty()) {
- guard.wait(1000); // broadcast when flush done
+ _cond.wait_for(guard, 1s); // broadcast when flush done
}
return !_closed;
}
@@ -167,6 +169,8 @@ FlushEngine::Run(FastOS_ThreadInterface *thread, void *arg)
}
LOG(debug, "Making another wait(idle=%s, timeMS=%d) last was '%s'", shouldIdle ? "true" : "false", shouldIdle ? _idleIntervalMS : 0, prevFlushName.c_str());
}
+ _executor.sync();
+ prune();
}
bool
@@ -174,7 +178,7 @@ FlushEngine::prune()
{
std::set<IFlushHandler::SP> toPrune;
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
if (_pendingPrune.empty()) {
return false;
}
@@ -187,7 +191,7 @@ FlushEngine::prune()
return true;
}
-bool FlushEngine::isFlushing(const MonitorGuard & guard, const vespalib::string & name) const
+bool FlushEngine::isFlushing(const std::lock_guard<std::mutex> & guard, const vespalib::string & name) const
{
(void) guard;
for(const auto & it : _flushing) {
@@ -203,7 +207,7 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const
{
FlushContext::List ret;
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
for (const auto & it : _handlers) {
IFlushHandler & handler(*it.second);
search::SerialNum serial(handler.getCurrentSerialNumber());
@@ -227,12 +231,12 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const
}
std::pair<FlushContext::List,bool>
-FlushEngine::getSortedTargetList(MonitorGuard &strategyGuard) const
+FlushEngine::getSortedTargetList()
{
- (void) strategyGuard;
FlushContext::List unsortedTargets = getTargetList(false);
- std::pair<FlushContext::List, bool> ret;
flushengine::TlsStatsMap tlsStatsMap(_tlsStatsFactory->create());
+ std::lock_guard<std::mutex> strategyGuard(_strategyLock);
+ std::pair<FlushContext::List, bool> ret;
if (_priorityStrategy) {
ret = std::make_pair(_priorityStrategy->getFlushTargets(unsortedTargets, tlsStatsMap), true);
} else {
@@ -291,14 +295,15 @@ FlushEngine::flushAll(const FlushContext::List &lst)
vespalib::string
FlushEngine::flushNextTarget(const vespalib::string & name)
{
- MonitorGuard strategyGuard(_strategyMonitor);
- std::pair<FlushContext::List,bool> lst = getSortedTargetList(strategyGuard);
+ std::pair<FlushContext::List,bool> lst = getSortedTargetList();
if (lst.second) {
// Everything returned from a priority strategy should be flushed
flushAll(lst.first);
_executor.sync();
+ prune();
+ std::lock_guard<std::mutex> strategyGuard(_strategyLock);
_priorityStrategy.reset();
- strategyGuard.broadcast();
+ _strategyCond.notify_all();
return "";
}
if (lst.first.empty()) {
@@ -340,7 +345,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
{
fastos::TimeStamp duration;
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
duration = fastos::TimeStamp(fastos::ClockSystem::now()) - _flushing[taskId].getStart();
}
if (LOG_WOULD_LOG(event)) {
@@ -351,20 +356,20 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
stats.getPathElementsToLog());
}
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec());
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
_flushing.erase(taskId);
assert(ctx.getHandler());
if (_handlers.hasHandler(ctx.getHandler())) {
_pendingPrune.insert(ctx.getHandler());
}
- guard.broadcast();
+ _cond.notify_all();
}
IFlushHandler::SP
FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
const IFlushHandler::SP &flushHandler)
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler));
if (result) {
_pendingPrune.erase(result);
@@ -376,14 +381,14 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
IFlushHandler::SP
FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
return _handlers.getHandler(docTypeName);
}
IFlushHandler::SP
FlushEngine::removeFlushHandler(const DocTypeName &docTypeName)
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
IFlushHandler::SP result(_handlers.removeHandler(docTypeName));
_pendingPrune.erase(result);
return std::move(result);
@@ -393,7 +398,7 @@ FlushEngine::FlushMetaSet
FlushEngine::getCurrentlyFlushingSet() const
{
FlushMetaSet s;
- vespalib::LockGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
for (const auto & it : _flushing) {
s.insert(it.second);
}
@@ -405,7 +410,7 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP
{
uint32_t taskId(0);
{
- vespalib::LockGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
taskId = _taskId++;
vespalib::string name(FlushContext::createName(*handler, *target));
FlushInfo flush(taskId, target, name);
@@ -419,19 +424,19 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP
void
FlushEngine::setStrategy(IFlushStrategy::SP strategy)
{
- vespalib::LockGuard strategyLock(_strategyLock);
- MonitorGuard strategyGuard(_strategyMonitor);
+ std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock);
+ std::unique_lock<std::mutex> strategyGuard(_strategyLock);
if (_closed) {
return;
}
assert(!_priorityStrategy);
_priorityStrategy = strategy;
{
- MonitorGuard guard(_monitor);
- guard.broadcast();
+ std::lock_guard<std::mutex> guard(_lock);
+ _cond.notify_all();
}
while (_priorityStrategy) {
- strategyGuard.wait();
+ _strategyCond.wait(strategyGuard);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 7f4698610c8..19175f9ce2a 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -5,10 +5,11 @@
#include "iflushstrategy.h"
#include <vespa/searchcore/proton/common/handlermap.hpp>
#include <vespa/searchcore/proton/common/doctypename.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fastos/thread.h>
#include <set>
+#include <mutex>
+#include <condition_variable>
namespace proton {
@@ -53,16 +54,18 @@ private:
IFlushStrategy::SP _strategy;
mutable IFlushStrategy::SP _priorityStrategy;
vespalib::ThreadStackExecutor _executor;
- vespalib::Monitor _monitor;
+ mutable std::mutex _lock;
+ std::condition_variable _cond;
FlushHandlerMap _handlers;
FlushMap _flushing;
- vespalib::Lock _strategyLock; // serialize setStrategy calls
- vespalib::Monitor _strategyMonitor;
+ std::mutex _setStrategyLock; // serialize setStrategy calls
+ std::mutex _strategyLock;
+ std::condition_variable _strategyCond;
std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory;
std::set<IFlushHandler::SP> _pendingPrune;
FlushContext::List getTargetList(bool includeFlushingTargets) const;
- std::pair<FlushContext::List,bool> getSortedTargetList(vespalib::MonitorGuard &strategyGuard) const;
+ std::pair<FlushContext::List,bool> getSortedTargetList();
FlushContext::SP initNextFlush(const FlushContext::List &lst);
vespalib::string flushNextTarget(const vespalib::string & name);
void flushAll(const FlushContext::List &lst);
@@ -70,9 +73,9 @@ private:
uint32_t initFlush(const FlushContext &ctx);
uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target);
void flushDone(const FlushContext &ctx, uint32_t taskId);
- bool canFlushMore(const vespalib::MonitorGuard & guard) const;
+ bool canFlushMore(const std::unique_lock<std::mutex> &guard) const;
bool wait(size_t minimumWaitTimeIfReady);
- bool isFlushing(const vespalib::MonitorGuard & guard, const vespalib::string & name) const;
+ bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const;
friend class FlushTask;
friend class FlushEngineExplorer;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
index 421b176bfec..3a24330f8ec 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/iflushhandler.h
@@ -65,7 +65,7 @@ public:
/**
* This method is called after a flush has been completed. All transactions
* up to the given serial number can be pruned from the domain of this
- * handler. This method is called by an arbitrary worker thread.
+ * handler. This method is called by the flush scheduler thread.
*
* @param flushedSerial Serial number flushed for all flush
* targets belonging to this handler.
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index bc1715e3c04..2d8368e091b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -11,6 +11,7 @@
#include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h>
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/persistenceengine/transport_latch.h>
+#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -261,6 +262,7 @@ FeedHandler::performPrune(SerialNum flushedSerial)
tlsPrune(flushedSerial); // throws on error
LOG(debug, "Pruned TLS to token %" PRIu64 ".", flushedSerial);
_owner.onPerformPrune(flushedSerial);
+ EventLogger::transactionLogPruneComplete(_tlsMgr.getDomainName(), flushedSerial);
} catch (const IllegalStateException & e) {
LOG(warning, "FeedHandler::performPrune failed due to '%s'.", e.what());
}
@@ -390,8 +392,9 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu
void
FeedHandler::flushDone(SerialNum flushedSerial)
{
- // Called by flush worker thread after performing a flush task
+ // Called by flush scheduler thread after flush worker thread has completed a flush task
_writeService.master().execute(makeLambdaTask([this, flushedSerial]() { performFlushDone(flushedSerial); }));
+ _writeService.master().sync();
}