summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/attribute/attributeflush_test.cpp27
-rw-r--r--searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp3
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp9
-rw-r--r--searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp9
-rw-r--r--searchcore/src/tests/proton/index/fusionrunner_test.cpp15
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp19
-rw-r--r--searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/index/indexmanager.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h4
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp5
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h3
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h5
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp12
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp8
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h4
44 files changed, 122 insertions, 100 deletions
diff --git a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp
index adf2f4a7d7d..d98de21ec5e 100644
--- a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp
@@ -9,6 +9,7 @@
#include <vespa/searchlib/attribute/attributefactory.h>
#include <vespa/searchlib/attribute/integerbase.h>
#include <vespa/searchlib/common/indexmetainfo.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/searchlib/test/directory_handler.h>
#include <vespa/vespalib/datastore/datastorebase.h>
@@ -119,7 +120,7 @@ UpdaterTask::startFlushing(uint64_t syncToken, FlushHandler & handler)
handler.gate.reset(new Gate());
IFlushTarget::SP flushable = _am.getFlushable("a1");
LOG(info, "startFlushing(%" PRIu64 ")", syncToken);
- handler.doFlushing(flushable->initFlush(syncToken));
+ handler.doFlushing(flushable->initFlush(syncToken, std::make_shared<search::FlushToken>()));
}
@@ -377,13 +378,13 @@ Test::requireThatFlushableAttributeManagesSyncTokenInfo()
IndexMetaInfo info("flush/a3");
EXPECT_EQUAL(0u, fa->getFlushedSerialNum());
- EXPECT_TRUE(fa->initFlush(0).get() == NULL);
+ EXPECT_TRUE(fa->initFlush(0, std::make_shared<search::FlushToken>()).get() == NULL);
EXPECT_TRUE(!info.load());
av->commit(10, 10); // last sync token = 10
EXPECT_EQUAL(0u, fa->getFlushedSerialNum());
- EXPECT_TRUE(fa->initFlush(10).get() != NULL);
- fa->initFlush(10)->run();
+ EXPECT_TRUE(fa->initFlush(10, std::make_shared<search::FlushToken>()).get() != NULL);
+ fa->initFlush(10, std::make_shared<search::FlushToken>())->run();
EXPECT_EQUAL(10u, fa->getFlushedSerialNum());
EXPECT_TRUE(info.load());
EXPECT_EQUAL(1u, info.snapshots().size());
@@ -392,7 +393,7 @@ Test::requireThatFlushableAttributeManagesSyncTokenInfo()
av->commit(20, 20); // last sync token = 20
EXPECT_EQUAL(10u, fa->getFlushedSerialNum());
- fa->initFlush(20)->run();
+ fa->initFlush(20, std::make_shared<search::FlushToken>())->run();
EXPECT_EQUAL(20u, fa->getFlushedSerialNum());
EXPECT_TRUE(info.load());
EXPECT_EQUAL(1u, info.snapshots().size()); // snapshot 10 removed
@@ -441,7 +442,7 @@ Test::requireThatCleanUpIsPerformedAfterFlush()
FlushableAttribute fa(av, diskLayout->getAttributeDir("a6"), TuneFileAttributes(),
f._fileHeaderContext, f._attributeFieldWriter,
f._hwInfo);
- fa.initFlush(30)->run();
+ fa.initFlush(30, std::make_shared<search::FlushToken>())->run();
EXPECT_TRUE(info.load());
EXPECT_EQUAL(1u, info.snapshots().size()); // snapshots 10 & 20 removed
@@ -462,7 +463,7 @@ Test::requireThatFlushStatsAreUpdated()
av->addDocs(1);
av->commit(100,100);
IFlushTarget::SP ft = am.getFlushable("a7");
- ft->initFlush(101)->run();
+ ft->initFlush(101, std::make_shared<search::FlushToken>())->run();
FlushStats stats = ft->getLastFlushStats();
EXPECT_EQUAL("flush/a7/snapshot-101", stats.getPath());
EXPECT_EQUAL(8u, stats.getPathElementsToLog());
@@ -483,7 +484,7 @@ Test::requireThatOnlyOneFlusherCanRunAtTheSameTime()
for (size_t i = 10; i < 100; ++i) {
av->commit(i, i);
- vespalib::Executor::Task::UP task = ft->initFlush(i);
+ vespalib::Executor::Task::UP task = ft->initFlush(i, std::make_shared<search::FlushToken>());
if (task) {
exec.execute(std::move(task));
}
@@ -520,7 +521,7 @@ Test::requireThatLastFlushTimeIsReported()
AttributeVector::SP av = amf.addAttribute("a9");
IFlushTarget::SP ft = am.getFlushable("a9");
EXPECT_EQUAL(vespalib::system_time(), ft->getLastFlushTime());
- ft->initFlush(200)->run();
+ ft->initFlush(200, std::make_shared<search::FlushToken>())->run();
EXPECT_TRUE(FastOS_File::Stat("flush/a9/snapshot-200", &stat));
EXPECT_EQUAL(seconds(stat._modifiedTime), duration_cast<seconds>(ft->getLastFlushTime().time_since_epoch()));
}
@@ -533,7 +534,7 @@ Test::requireThatLastFlushTimeIsReported()
{ // updated flush time after nothing to flush
std::this_thread::sleep_for(8000ms);
std::chrono::seconds now = duration_cast<seconds>(vespalib::system_clock::now().time_since_epoch());
- Executor::Task::UP task = ft->initFlush(200);
+ Executor::Task::UP task = ft->initFlush(200, std::make_shared<search::FlushToken>());
EXPECT_FALSE(task);
EXPECT_LESS(seconds(stat._modifiedTime), ft->getLastFlushTime().time_since_epoch());
EXPECT_APPROX(now.count(), duration_cast<seconds>(ft->getLastFlushTime().time_since_epoch()).count(), 8);
@@ -580,7 +581,7 @@ Test::requireThatShrinkWorks()
EXPECT_EQUAL(100u, av->getCommittedDocIdLimit());
EXPECT_EQUAL(createSerialNum - 1, ft->getFlushedSerialNum());
vespalib::ThreadStackExecutor exec(1, 128 * 1024);
- vespalib::Executor::Task::UP task = ft->initFlush(53);
+ vespalib::Executor::Task::UP task = ft->initFlush(53, std::make_shared<search::FlushToken>());
exec.execute(std::move(task));
exec.sync();
exec.shutdown();
@@ -611,7 +612,7 @@ Test::requireThatFlushedAttributeCanBeLoaded(const HwInfo &hwInfo)
}
av->commit();
IFlushTarget::SP ft = am.getFlushable(attrName);
- ft->initFlush(200)->run();
+ ft->initFlush(200, std::make_shared<search::FlushToken>())->run();
}
{
AttributeManagerFixture amf(f);
@@ -640,7 +641,7 @@ Test::requireThatFlushFailurePreventsSyncTokenUpdate()
EXPECT_EQUAL(1u, av->getNumDocs());
auto flush_target = am.getFlushable("a12");
EXPECT_EQUAL(0u, flush_target->getFlushedSerialNum());
- auto flush_task = flush_target->initFlush(200);
+ auto flush_task = flush_target->initFlush(200, std::make_shared<search::FlushToken>());
// Trigger flush failure
av->getEnumStoreBase()->inc_compaction_count();
flush_task->run();
diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
index 326cfba97f4..21b32b40c80 100644
--- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
+++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
@@ -11,6 +11,7 @@
#include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h>
#include <vespa/searchcore/proton/server/itlssyncer.h>
#include <vespa/searchlib/attribute/attributefilesavetarget.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/common/tunefileinfo.h>
#include <vespa/searchlib/fef/matchdatalayout.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
@@ -1900,7 +1901,7 @@ TEST(DocumentMetaStoreTest, shrink_via_flush_target_works)
ft->getApproxMemoryGain().getAfter());
vespalib::ThreadStackExecutor exec(1, 128 * 1024);
- vespalib::Executor::Task::UP task = ft->initFlush(11);
+ vespalib::Executor::Task::UP task = ft->initFlush(11, std::make_shared<search::FlushToken>());
exec.execute(std::move(task));
exec.sync();
exec.shutdown();
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index 38fca35ea87..a675a45aa54 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -9,6 +9,7 @@
#include <vespa/searchcore/proton/server/igetserialnum.h>
#include <vespa/searchcore/proton/test/dummy_flush_handler.h>
#include <vespa/searchcore/proton/test/dummy_flush_target.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/test/insertion_operators.h>
#include <vespa/vespalib/testkit/testapp.h>
@@ -101,9 +102,9 @@ public:
{
}
- Task::UP initFlush(SerialNum currentSerial) override
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override
{
- Task::UP task(_target->initFlush(currentSerial));
+ Task::UP task(_target->initFlush(currentSerial, std::move(flush_token)));
if (task) {
return std::make_unique<WrappedFlushTask>(std::move(task),
_handler);
@@ -287,7 +288,7 @@ public:
return _flushedSerial;
}
- Task::UP initFlush(SerialNum currentSerial) override {
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) override {
LOG(info, "SimpleTarget(%s)::initFlush(%" PRIu64 ")", getName().c_str(), currentSerial);
_currentSerial = currentSerial;
_initDone.countDown();
@@ -639,7 +640,7 @@ TEST("require that threaded target works")
auto target = std::make_shared<ThreadedFlushTarget>(executor, getSerialNum, std::make_shared<SimpleTarget>());
EXPECT_FALSE(executor._done.await(SHORT_TIMEOUT));
- EXPECT_TRUE(target->initFlush(0).get() != NULL);
+ EXPECT_TRUE(target->initFlush(0, std::make_shared<search::FlushToken>()).get() != NULL);
EXPECT_TRUE(executor._done.await(LONG_TIMEOUT));
}
diff --git a/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp b/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp
index 6c502bccce1..acf3f66653d 100644
--- a/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp
@@ -3,6 +3,7 @@
#include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h>
#include <vespa/searchlib/common/i_compactable_lid_space.h>
+#include <vespa/searchlib/common/flush_token.h>
using namespace proton;
using search::SerialNum;
@@ -84,7 +85,7 @@ TEST_F("require that flush target returns no estimated memory gain when not able
TEST_F("require that flush target returns no estimated memory gain right after shrink", Fixture)
{
- auto task = f._ft->initFlush(20);
+ auto task = f._ft->initFlush(20, std::make_shared<search::FlushToken>());
EXPECT_TRUE(validTask(task));
task->run();
auto memoryGain = f._ft->getApproxMemoryGain();
@@ -96,7 +97,7 @@ TEST_F("require that flush target returns no estimated memory gain right after s
TEST_F("require that flush target returns no task when not able to flush", Fixture)
{
f._lidSpace->setCanShrink(false);
- auto task = f._ft->initFlush(20);
+ auto task = f._ft->initFlush(20, std::make_shared<search::FlushToken>());
EXPECT_FALSE(validTask(task));
EXPECT_EQUAL(20u, f._ft->getFlushedSerialNum());
EXPECT_NOT_EQUAL(IFlushTarget::Time(), f._ft->getLastFlushTime());
@@ -105,14 +106,14 @@ TEST_F("require that flush target returns no task when not able to flush", Fixtu
TEST_F("require that flush target returns valid task when able to flush again", Fixture)
{
f._lidSpace->setCanShrink(false);
- auto task = f._ft->initFlush(20);
+ auto task = f._ft->initFlush(20, std::make_shared<search::FlushToken>());
EXPECT_FALSE(validTask(task));
EXPECT_EQUAL(20u, f._ft->getFlushedSerialNum());
EXPECT_NOT_EQUAL(IFlushTarget::Time(), f._ft->getLastFlushTime());
f._lidSpace->setCanShrink(true);
auto memoryGain = f._ft->getApproxMemoryGain();
EXPECT_EQUAL(16, memoryGain.gain());
- task = f._ft->initFlush(20);
+ task = f._ft->initFlush(20, std::make_shared<search::FlushToken>());
EXPECT_TRUE(validTask(task));
task->run();
EXPECT_EQUAL(20u, f._ft->getFlushedSerialNum());
diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp
index 6831dd60bd5..acd5c86fd5d 100644
--- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp
+++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/searchcore/proton/server/executorthreadingservice.h>
#include <vespa/searchcorespi/index/fusionrunner.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/diskindex/diskindex.h>
#include <vespa/searchlib/diskindex/indexbuilder.h>
#include <vespa/searchlib/fef/matchdatalayout.h>
@@ -252,13 +253,13 @@ void Test::checkResults(uint32_t fusion_id, const uint32_t *ids, size_t size) {
}
void Test::requireThatNoDiskIndexesGiveId0() {
- uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops);
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>());
EXPECT_EQUAL(0u, fusion_id);
}
void Test::requireThatOneDiskIndexCausesCopy() {
createIndex(base_dir, disk_id[0]);
- uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops);
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>());
EXPECT_EQUAL(disk_id[0], fusion_id);
set<uint32_t> fusion_ids = readFusionIds(base_dir);
ASSERT_TRUE(!fusion_ids.empty());
@@ -271,7 +272,7 @@ void Test::requireThatOneDiskIndexCausesCopy() {
void Test::requireThatTwoDiskIndexesCauseFusion() {
createIndex(base_dir, disk_id[0]);
createIndex(base_dir, disk_id[1]);
- uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops);
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>());
EXPECT_EQUAL(disk_id[1], fusion_id);
set<uint32_t> fusion_ids = readFusionIds(base_dir);
ASSERT_TRUE(!fusion_ids.empty());
@@ -286,7 +287,7 @@ void Test::requireThatFusionCanRunOnMultipleDiskIndexes() {
createIndex(base_dir, disk_id[1]);
createIndex(base_dir, disk_id[2]);
createIndex(base_dir, disk_id[3]);
- uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops);
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>());
EXPECT_EQUAL(disk_id[3], fusion_id);
set<uint32_t> fusion_ids = readFusionIds(base_dir);
ASSERT_TRUE(!fusion_ids.empty());
@@ -299,7 +300,7 @@ void Test::requireThatFusionCanRunOnMultipleDiskIndexes() {
void Test::requireThatOldFusionIndexCanBePartOfNewFusion() {
createIndex(base_dir, disk_id[0], true);
createIndex(base_dir, disk_id[1]);
- uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops);
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>());
EXPECT_EQUAL(disk_id[1], fusion_id);
set<uint32_t> fusion_ids = readFusionIds(base_dir);
ASSERT_TRUE(!fusion_ids.empty());
@@ -313,12 +314,12 @@ void Test::requireThatOldFusionIndexCanBePartOfNewFusion() {
void Test::requireThatSelectorsCanBeRebased() {
createIndex(base_dir, disk_id[0]);
createIndex(base_dir, disk_id[1]);
- uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops);
+ uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>());
_fusion_spec.flush_ids.clear();
_fusion_spec.last_fusion_id = fusion_id;
createIndex(base_dir, disk_id[2]);
- fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops);
+ fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>());
checkResults(fusion_id, disk_id, 3);
}
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 89594821803..67eb11cee3e 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -8,6 +8,7 @@
#include <vespa/searchcorespi/index/indexflushtarget.h>
#include <vespa/searchcorespi/index/indexfusiontarget.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/common/serialnum.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
@@ -278,7 +279,7 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed)
IndexFlushTarget target(_index_manager->getMaintainer());
EXPECT_EQ(vespalib::system_time(), target.getLastFlushTime());
vespalib::Executor::Task::UP flushTask;
- runAsMaster([&]() { flushTask = target.initFlush(1); });
+ runAsMaster([&]() { flushTask = target.initFlush(1, std::make_shared<search::FlushToken>()); });
flushTask->run();
EXPECT_TRUE(FastOS_File::Stat("test_data/index.flush.1", &stat));
EXPECT_EQ(seconds(stat._modifiedTime), duration_cast<seconds>(target.getLastFlushTime().time_since_epoch()));
@@ -305,7 +306,7 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed)
std::this_thread::sleep_for(8s);
std::chrono::seconds now = duration_cast<seconds>(vespalib::system_clock::now().time_since_epoch());
vespalib::Executor::Task::UP task;
- runAsMaster([&]() { task = target.initFlush(2); });
+ runAsMaster([&]() { task = target.initFlush(2, std::make_shared<search::FlushToken>()); });
EXPECT_FALSE(task);
EXPECT_EQ(2u, target.getFlushedSerialNum());
EXPECT_LT(seconds(stat._modifiedTime), duration_cast<seconds>(target.getLastFlushTime().time_since_epoch()));
@@ -480,7 +481,7 @@ TEST_F(IndexManagerTest, require_that_fusion_updates_indexes)
FusionSpec fusion_spec;
fusion_spec.flush_ids.assign(ids, ids + 4);
- _index_manager->getMaintainer().runFusion(fusion_spec);
+ _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>());
set<uint32_t> fusion_ids = readDiskIds(index_dir, "fusion");
EXPECT_EQ(1u, fusion_ids.size());
@@ -502,7 +503,7 @@ TEST_F(IndexManagerTest, require_that_flush_triggers_fusion)
flushIndexManager();
}
IFlushTarget::SP target(new IndexFusionTarget(_index_manager->getMaintainer()));
- target->initFlush(0)->run();
+ target->initFlush(0, std::make_shared<search::FlushToken>())->run();
addDocument(docid);
flushIndexManager();
set<uint32_t> fusion_ids = readDiskIds(index_dir, "fusion");
@@ -549,7 +550,7 @@ TEST_F(IndexManagerTest, require_that_fusion_cleans_up_old_indexes)
FusionSpec fusion_spec;
fusion_spec.flush_ids.push_back(1);
fusion_spec.flush_ids.push_back(2);
- _index_manager->getMaintainer().runFusion(fusion_spec);
+ _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>());
flush_ids = readDiskIds(index_dir, "flush");
EXPECT_EQ(1u, flush_ids.size());
@@ -597,7 +598,7 @@ TEST_F(IndexManagerTest, require_that_disk_indexes_are_loaded_on_startup)
FusionSpec fusion_spec;
fusion_spec.flush_ids.push_back(1);
fusion_spec.flush_ids.push_back(2);
- _index_manager->getMaintainer().runFusion(fusion_spec);
+ _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>());
_index_manager.reset(0);
ASSERT_TRUE(!indexExists("flush", 1));
@@ -644,7 +645,7 @@ TEST_F(IndexManagerTest, require_that_existing_indexes_are_to_be_fusioned_on_sta
resetIndexManager();
IFlushTarget::SP target(new IndexFusionTarget(_index_manager->getMaintainer()));
- target->initFlush(0)->run();
+ target->initFlush(0, std::make_shared<search::FlushToken>())->run();
addDocument(docid);
flushIndexManager();
@@ -670,7 +671,7 @@ TEST_F(IndexManagerTest, require_that_serial_number_is_copied_on_fusion)
FusionSpec fusion_spec;
fusion_spec.flush_ids.push_back(1);
fusion_spec.flush_ids.push_back(2);
- _index_manager->getMaintainer().runFusion(fusion_spec);
+ _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>());
FastOS_File file((index_dir + "/index.fusion.2/serial.dat").c_str());
EXPECT_TRUE(file.OpenReadOnly());
}
@@ -711,7 +712,7 @@ TEST_F(IndexManagerTest, require_that_failed_fusion_is_retried)
crippleFusion(2);
IndexFusionTarget target(_index_manager->getMaintainer());
- vespalib::Executor::Task::UP fusionTask = target.initFlush(1);
+ vespalib::Executor::Task::UP fusionTask = target.initFlush(1, std::make_shared<search::FlushToken>());
fusionTask->run();
FusionSpec spec = _index_manager->getMaintainer().getFusionSpec();
diff --git a/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp b/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp
index 4ed01f8caf7..2238e3c4f0c 100644
--- a/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp
+++ b/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp
@@ -6,6 +6,7 @@ LOG_SETUP("job_tracked_flush_test");
#include <vespa/searchcore/proton/metrics/job_tracked_flush_task.h>
#include <vespa/searchcore/proton/test/dummy_flush_target.h>
#include <vespa/searchcore/proton/test/simple_job_tracker.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
@@ -46,7 +47,7 @@ struct MyFlushTarget : public test::DummyFlushTarget
{}
// Implements searchcorespi::IFlushTarget
- virtual FlushTask::UP initFlush(SerialNum currentSerial) override {
+ virtual FlushTask::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) override {
if (currentSerial > 0) {
_initFlushSerial = currentSerial;
_initGate.await(5000);
@@ -74,7 +75,7 @@ struct Fixture
{
}
void initFlush(SerialNum currentSerial) {
- _task = _trackedFlush.initFlush(currentSerial);
+ _task = _trackedFlush.initFlush(currentSerial, std::make_shared<search::FlushToken>());
_taskGate.countDown();
}
};
@@ -130,7 +131,7 @@ TEST_F("require that flush task execution is tracked", Fixture(2))
TEST_F("require that nullptr flush task is not tracked", Fixture)
{
- FlushTask::UP task = f._trackedFlush.initFlush(0);
+ FlushTask::UP task = f._trackedFlush.initFlush(0, std::make_shared<search::FlushToken>());
EXPECT_TRUE(task.get() == nullptr);
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp
index af7bae32b11..6399c52696c 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp
@@ -3,6 +3,7 @@
#include "attribute_populator.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/gate.h>
#include <vespa/searchlib/attribute/attributevector.h>
@@ -88,7 +89,7 @@ AttributePopulator::done()
auto flushTargets = mgr->getFlushTargets();
for (const auto &flushTarget : flushTargets) {
assert(flushTarget->getFlushedSerialNum() < _configSerialNum);
- auto task = flushTarget->initFlush(_configSerialNum);
+ auto task = flushTarget->initFlush(_configSerialNum, std::make_shared<search::FlushToken>());
// shrink target only return task if able to shrink.
if (task) {
task->run();
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
index 504f6841daf..cc1af923fc0 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
@@ -13,6 +13,7 @@
#include <vespa/searchlib/attribute/attributecontext.h>
#include <vespa/searchlib/attribute/attribute_read_guard.h>
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchcommon/attribute/i_attribute_functor.h>
#include <vespa/searchlib/attribute/interlock.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
@@ -326,7 +327,7 @@ AttributeManager::flushAll(SerialNum currentSerial)
auto flushTargets = getFlushTargets();
for (const auto &ft : flushTargets) {
vespalib::Executor::Task::UP task;
- task = ft->initFlush(currentSerial);
+ task = ft->initFlush(currentSerial, std::make_shared<search::FlushToken>());
if (task.get() != NULL) {
task->run();
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
index 4edb64b861a..9bff8e2a438 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
@@ -225,7 +225,7 @@ FlushableAttribute::internalInitFlush(SerialNum currentSerial)
IFlushTarget::Task::UP
-FlushableAttribute::initFlush(SerialNum currentSerial)
+FlushableAttribute::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>)
{
// Called by document db executor
std::promise<IFlushTarget::Task::UP> promise;
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
index a759bcce26e..c6654392372 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h
@@ -69,7 +69,7 @@ public:
virtual DiskGain getApproxDiskGain() const override;
virtual Time getLastFlushTime() const override;
virtual SerialNum getFlushedSerialNum() const override;
- virtual Task::UP initFlush(SerialNum currentSerial) override;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual FlushStats getLastFlushStats() const override { return _lastStats; }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
virtual double get_replay_operation_cost() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp
index 79f32d60056..6bda77f97a0 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp
@@ -74,7 +74,7 @@ SummaryCompactTarget::getFlushedSerialNum() const
}
IFlushTarget::Task::UP
-SummaryCompactTarget::initFlush(SerialNum currentSerial)
+SummaryCompactTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>)
{
std::promise<Task::UP> promise;
std::future<Task::UP> future = promise.get_future();
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h
index 5dcfb2b9c10..8efd4c0d3bf 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h
@@ -28,7 +28,7 @@ public:
virtual SerialNum getFlushedSerialNum() const override;
virtual Time getLastFlushTime() const override;
- virtual Task::UP initFlush(SerialNum currentSerial) override;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual FlushStats getLastFlushStats() const override { return _lastStats; }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
index a101ad4d83c..8103a28db5f 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp
@@ -85,7 +85,7 @@ SummaryFlushTarget::internalInitFlush(SerialNum currentSerial) {
return std::make_unique<Flusher>(_docStore, _lastStats, currentSerial);
}
IFlushTarget::Task::UP
-SummaryFlushTarget::initFlush(SerialNum currentSerial)
+SummaryFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>)
{
// Called by document db executor
std::promise<Task::UP> promise;
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
index 7e450649a52..4912d99f7f7 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h
@@ -29,7 +29,7 @@ public:
virtual SerialNum getFlushedSerialNum() const override;
virtual Time getLastFlushTime() const override;
- virtual Task::UP initFlush(SerialNum currentSerial) override;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual FlushStats getLastFlushStats() const override { return _lastStats; }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
index 6abbf477959..60853f765ea 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
@@ -53,7 +53,7 @@ public:
searchcorespi::index::IThreadService & summaryService,
std::shared_ptr<ICompactableLidSpace> target);
~ShrinkSummaryLidSpaceFlushTarget() override;
- Task::UP initFlush(SerialNum currentSerial) override;
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
};
ShrinkSummaryLidSpaceFlushTarget::
@@ -69,11 +69,11 @@ ShrinkSummaryLidSpaceFlushTarget(const vespalib::string &name, Type type, Compon
ShrinkSummaryLidSpaceFlushTarget::~ShrinkSummaryLidSpaceFlushTarget() = default;
IFlushTarget::Task::UP
-ShrinkSummaryLidSpaceFlushTarget::initFlush(SerialNum currentSerial)
+ShrinkSummaryLidSpaceFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token)
{
std::promise<Task::UP> promise;
std::future<Task::UP> future = promise.get_future();
- _summaryService.execute(makeLambdaTask([&]() { promise.set_value(ShrinkLidSpaceFlushTarget::initFlush(currentSerial)); }));
+ _summaryService.execute(makeLambdaTask([&]() { promise.set_value(ShrinkLidSpaceFlushTarget::initFlush(currentSerial, flush_token)); }));
return future.get();
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
index ad3a56b3d56..dcfa7b13970 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp
@@ -211,7 +211,7 @@ DocumentMetaStoreFlushTarget::getLastFlushTime() const
IFlushTarget::Task::UP
-DocumentMetaStoreFlushTarget::initFlush(SerialNum currentSerial)
+DocumentMetaStoreFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>)
{
// Called by document db executor
_dms->removeAllOldGenerations();
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
index 5cc674d3614..c670279e3fa 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h
@@ -58,7 +58,7 @@ public:
DiskGain getApproxDiskGain() const override;
Time getLastFlushTime() const override;
SerialNum getFlushedSerialNum() const override;
- Task::UP initFlush(SerialNum currentSerial) override;
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
FlushStats getLastFlushStats() const override { return _lastStats; }
static void initCleanup(const vespalib::string &baseDir);
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h
index e2bf9783710..ecd66f9cbfb 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h
@@ -48,7 +48,7 @@ public:
virtual Time getLastFlushTime() const override { return _lastFlushTime; }
virtual bool needUrgentFlush() const override { return _needUrgentFlush; }
- virtual Task::UP initFlush(SerialNum currentSerial) override { return _target->initFlush(currentSerial); }
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override { return _target->initFlush(currentSerial, std::move(flush_token)); }
virtual FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp
index ebbe2171e63..b569e499876 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp
@@ -34,10 +34,10 @@ FlushContext::~FlushContext()
}
bool
-FlushContext::initFlush()
+FlushContext::initFlush(std::shared_ptr<search::IFlushToken> flush_token)
{
LOG(debug, "Attempting to flush '%s'.", _name.c_str());
- _task = _target->initFlush(std::max(_handler->getCurrentSerialNumber(), _lastSerial));
+ _task = _target->initFlush(std::max(_handler->getCurrentSerialNumber(), _lastSerial), std::move(flush_token));
if (_task.get() == NULL) {
LOG(debug, "Target refused to init flush.");
return false;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h
index 7ce8210db2f..2c380cc28ed 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h
@@ -54,9 +54,9 @@ public:
* signature. If this method returns true, the task to complete the flush is
* available through getTask().
*
- * @param True if a flush was initiated.
+ * @return True if a flush was initiated.
*/
- bool initFlush();
+ bool initFlush(std::shared_ptr<search::IFlushToken> flush_token);
/**
* Returns the unique name of this context. This is the concatenation of the
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index a177aa608e7..5f35ecc916d 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -7,6 +7,7 @@
#include "tls_stats_map.h"
#include "tls_stats_factory.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/vespalib/util/jsonwriter.h>
#include <thread>
@@ -276,7 +277,7 @@ FlushEngine::initNextFlush(const FlushContext::List &lst)
if (LOG_WOULD_LOG(event)) {
EventLogger::flushInit(it->getName());
}
- if (it->initFlush()) {
+ if (it->initFlush(std::make_shared<search::FlushToken>())) {
ctx = it;
break;
}
@@ -293,7 +294,7 @@ FlushEngine::flushAll(const FlushContext::List &lst)
LOG(debug, "%ld targets to flush.", lst.size());
for (const FlushContext::SP & ctx : lst) {
if (wait(0)) {
- if (ctx->initFlush()) {
+ if (ctx->initFlush(std::make_shared<search::FlushToken>())) {
logTarget("initiated", *ctx);
_executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
} else {
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp
index d51d5a113cb..f264555be1a 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp
@@ -59,9 +59,9 @@ FlushTargetProxy::needUrgentFlush() const
IFlushTarget::Task::UP
-FlushTargetProxy::initFlush(SerialNum currentSerial)
+FlushTargetProxy::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token)
{
- return _target->initFlush(currentSerial);
+ return _target->initFlush(currentSerial, std::move(flush_token));
}
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h
index 2b50bf2e222..84bf91a2f9a 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h
@@ -62,7 +62,7 @@ public:
needUrgentFlush() const override;
virtual Task::UP
- initFlush(SerialNum currentSerial) override;
+ initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual searchcorespi::FlushStats
getLastFlushStats() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp
index e9c2554f13c..45fcd1d19aa 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp
@@ -87,7 +87,7 @@ ShrinkLidSpaceFlushTarget::needUrgentFlush() const
}
IFlushTarget::Task::UP
-ShrinkLidSpaceFlushTarget::initFlush(SerialNum currentSerial)
+ShrinkLidSpaceFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>)
{
if (currentSerial < _flushedSerialNum) {
_lastFlushTime = vespalib::system_clock::now();
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h
index c1ecb2c2b4a..ebd8a54004b 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h
@@ -47,7 +47,7 @@ public:
SerialNum getFlushedSerialNum() const override;
Time getLastFlushTime() const override;
bool needUrgentFlush() const override;
- Task::UP initFlush(SerialNum currentSerial) override;
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
searchcorespi::FlushStats getLastFlushStats() const override;
uint64_t getApproxBytesToWriteToDisk() const override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp
index 3c1ac1b5361..59a5504b57b 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp
@@ -34,25 +34,26 @@ ThreadedFlushTarget::ThreadedFlushTarget(vespalib::Executor &executor,
namespace {
IFlushTarget::Task::UP
callInitFlush(IFlushTarget *target, IFlushTarget::SerialNum serial,
- const IGetSerialNum *getSerialNum) {
+ const IGetSerialNum *getSerialNum, std::shared_ptr<search::IFlushToken> flush_token) {
// Serial number from flush engine might have become stale, obtain
// a fresh serial number now.
(void) serial;
search::SerialNum freshSerial = getSerialNum->getSerialNum();
assert(freshSerial >= serial);
- return target->initFlush(freshSerial);
+ return target->initFlush(freshSerial, std::move(flush_token));
}
} // namespace
IFlushTarget::Task::UP
-ThreadedFlushTarget::initFlush(SerialNum currentSerial)
+ThreadedFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token)
{
std::promise<Task::UP> promise;
std::future<Task::UP> future = promise.get_future();
_executor.execute(makeLambdaTask([&]()
{ promise.set_value(callInitFlush(_target.get(),
currentSerial,
- &_getSerialNum)); }));
+ &_getSerialNum,
+ flush_token)); }));
return future.get();
}
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h
index 26cdb6903d3..cd2ee11265f 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h
@@ -50,7 +50,7 @@ public:
// Implements IFlushTarget.
virtual Task::UP
- initFlush(SerialNum currentSerial) override;
+ initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
index d93f02c7994..69790b3dceb 100644
--- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
@@ -3,7 +3,6 @@
#include "indexmanager.h"
#include "diskindexwrapper.h"
#include "memoryindexwrapper.h"
-#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/common/serialnumfileheadercontext.h>
#include <vespa/searchlib/diskindex/fusion.h>
@@ -12,7 +11,7 @@ using search::common::FileHeaderContext;
using search::common::SerialNumFileHeaderContext;
using search::index::Schema;
using search::index::SchemaUtil;
-using search::FlushToken;
+using search::IFlushToken;
using search::TuneFileIndexing;
using search::TuneFileIndexManager;
using search::TuneFileSearch;
@@ -65,12 +64,13 @@ IndexManager::MaintainerOperations::runFusion(const Schema &schema,
const vespalib::string &outputDir,
const std::vector<vespalib::string> &sources,
const SelectorArray &selectorArray,
- SerialNum serialNum)
+ SerialNum serialNum,
+ std::shared_ptr<IFlushToken> flush_token)
{
SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, serialNum);
const bool dynamic_k_doc_pos_occ_format = false;
return Fusion::merge(schema, outputDir, sources, selectorArray, dynamic_k_doc_pos_occ_format,
- _tuneFileIndexing, fileHeaderContext, _threadingService.shared(), std::make_shared<FlushToken>());
+ _tuneFileIndexing, fileHeaderContext, _threadingService.shared(), std::move(flush_token));
}
diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h
index 7e305681f78..7cf25292d1c 100644
--- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h
@@ -56,7 +56,8 @@ public:
bool runFusion(const Schema &schema, const vespalib::string &outputDir,
const std::vector<vespalib::string> &sources,
const SelectorArray &docIdSelector,
- search::SerialNum lastSerialNum) override;
+ search::SerialNum lastSerialNum,
+ std::shared_ptr<search::IFlushToken> flush_token) override;
};
private:
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
index 14d76645fc7..cb75472a748 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
@@ -19,10 +19,10 @@ JobTrackedFlushTarget::JobTrackedFlushTarget(const IJobTracker::SP &tracker,
JobTrackedFlushTarget::~JobTrackedFlushTarget() {}
FlushTask::UP
-JobTrackedFlushTarget::initFlush(SerialNum currentSerial)
+JobTrackedFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token)
{
_tracker->start();
- FlushTask::UP targetTask = _target->initFlush(currentSerial);
+ FlushTask::UP targetTask = _target->initFlush(currentSerial, std::move(flush_token));
_tracker->end();
if (targetTask.get() != nullptr) {
return std::make_unique<JobTrackedFlushTask>(_tracker, std::move(targetTask));
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
index fdfe6986ec3..053f6b75ddd 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
@@ -40,7 +40,7 @@ public:
virtual bool needUrgentFlush() const override {
return _target->needUrgentFlush();
}
- virtual searchcorespi::FlushTask::UP initFlush(SerialNum currentSerial) override;
+ virtual searchcorespi::FlushTask::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual searchcorespi::FlushStats getLastFlushStats() const override {
return _target->getLastFlushStats();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
index 0bd50fc0104..7dbf54cfd6c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
@@ -19,6 +19,7 @@
#include <vespa/searchcore/proton/matching/sessionmanager.h>
#include <vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h>
#include <vespa/searchlib/attribute/configconverter.h>
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/docstore/document_store_visitor_progress.h>
#include <vespa/searchlib/util/fileheadertk.h>
#include <vespa/vespalib/io/fileutil.h>
@@ -482,7 +483,7 @@ StoreOnlyDocSubDB::close()
assert(_writeService.master().isCurrentThread());
search::IDocumentStore & store(_rSummaryMgr->getBackingStore());
auto summaryFlush = std::make_shared<SummaryFlushTarget>(store, _writeService.summary());
- auto summaryFlushTask = summaryFlush->initFlush(store.tentativeLastSyncToken());
+ auto summaryFlushTask = summaryFlush->initFlush(store.tentativeLastSyncToken(), std::make_shared<search::FlushToken>());
if (summaryFlushTask) {
SerialNum syncToken = summaryFlushTask->getFlushSerial();
_tlSyncer.sync(syncToken);
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
index dd3cd49df2f..4178780da98 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
@@ -20,7 +20,7 @@ struct DummyFlushTarget : public searchcorespi::IFlushTarget
SerialNum getFlushedSerialNum() const override { return 0; }
Time getLastFlushTime() const override { return Time(); }
bool needUrgentFlush() const override { return false; }
- searchcorespi::FlushTask::UP initFlush(SerialNum) override {
+ searchcorespi::FlushTask::UP initFlush(SerialNum, std::shared_ptr<search::IFlushToken>) override {
return searchcorespi::FlushTask::UP();
}
searchcorespi::FlushStats getLastFlushStats() const override {
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
index 3a67333c9d5..36a4a320ad4 100644
--- a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
+++ b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
@@ -6,6 +6,8 @@
#include <vespa/vespalib/util/time.h>
#include <vector>
+namespace search { class IFlushToken; }
+
namespace searchcorespi {
/**
@@ -172,7 +174,7 @@ public:
* @param currentSerial The current transaction serial number.
* @return The task used to complete the flush.
*/
- virtual Task::UP initFlush(SerialNum currentSerial) = 0;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) = 0;
/**
* Returns the stats for the last completed flush operation
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
index 841b24af576..f7e818d2d0b 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
@@ -84,7 +84,8 @@ writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id,
uint32_t
FusionRunner::fuse(const FusionSpec &fusion_spec,
SerialNum lastSerialNum,
- IIndexMaintainerOperations &operations)
+ IIndexMaintainerOperations &operations,
+ std::shared_ptr<search::IFlushToken> flush_token)
{
const vector<uint32_t> &ids = fusion_spec.flush_ids;
if (ids.empty()) {
@@ -113,7 +114,7 @@ FusionRunner::fuse(const FusionSpec &fusion_spec,
SelectorArray selector_array;
readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id);
- if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum)) {
+ if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) {
return 0;
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
index 44323b06932..ddb23c33dd6 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
@@ -56,7 +56,8 @@ public:
**/
uint32_t fuse(const FusionSpec &fusion_spec,
search::SerialNum lastSerialNum,
- IIndexMaintainerOperations &operations);
+ IIndexMaintainerOperations &operations,
+ std::shared_ptr<search::IFlushToken> flush_token);
};
} // namespace index
diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
index 99f17b12b79..49f463c3631 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
@@ -8,6 +8,8 @@
#include <vespa/searchlib/diskindex/docidmapper.h>
#include <vespa/searchlib/index/i_field_length_inspector.h>
+namespace search { class IFlushToken; }
+
namespace searchcorespi::index {
/**
@@ -52,7 +54,8 @@ struct IIndexMaintainerOperations {
const vespalib::string &outputDir,
const std::vector<vespalib::string> &sources,
const SelectorArray &selectorArray,
- search::SerialNum lastSerialNum) = 0;
+ search::SerialNum lastSerialNum,
+ std::shared_ptr<search::IFlushToken> flush_token) = 0;
};
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
index e7bc26b8dd1..a88b5eed414 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
@@ -60,7 +60,7 @@ IndexFlushTarget::getFlushedSerialNum() const
}
IFlushTarget::Task::UP
-IndexFlushTarget::initFlush(SerialNum serialNum)
+IndexFlushTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken>)
{
// the target must live until this task is done (handled by flush engine).
return _indexMaintainer.initFlush(serialNum, &_lastStats);
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
index 8769a3ee66d..50ae2000a11 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
@@ -30,7 +30,7 @@ public:
virtual bool needUrgentFlush() const override;
- virtual Task::UP initFlush(SerialNum currentSerial) override;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual FlushStats getLastFlushStats() const override { return _lastStats; }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
};
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
index b538e817cf8..64f4584e465 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
@@ -15,15 +15,17 @@ private:
IndexMaintainer &_indexMaintainer;
FlushStats &_stats;
SerialNum _serialNum;
+ std::shared_ptr<search::IFlushToken> _flush_token;
public:
- Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum) :
+ Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) :
_indexMaintainer(indexMaintainer),
_stats(stats),
- _serialNum(serialNum)
+ _serialNum(serialNum),
+ _flush_token(std::move(flush_token))
{}
void run() override {
- vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum);
+ vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum, _flush_token);
// the target must live until this task is done (handled by flush engine).
_stats.setPath(outputFusionDir);
}
@@ -86,9 +88,9 @@ IndexFusionTarget::getFlushedSerialNum() const
}
IFlushTarget::Task::UP
-IndexFusionTarget::initFlush(SerialNum serialNum)
+IndexFusionTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token)
{
- return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum);
+ return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum, std::move(flush_token));
}
uint64_t
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
index 98a61fb12ed..e4a9c803101 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
@@ -26,7 +26,7 @@ public:
virtual Time getLastFlushTime() const override;
virtual bool needUrgentFlush() const override;
- virtual Task::UP initFlush(SerialNum currentSerial) override;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual FlushStats getLastFlushStats() const override { return _lastStats; }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
};
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index c75a74ff141..e2bcb8b7629 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -962,7 +962,7 @@ IndexMaintainer::getFusionSpec()
}
string
-IndexMaintainer::doFusion(SerialNum serialNum)
+IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token)
{
// Called by a flush engine worker thread
@@ -984,7 +984,7 @@ IndexMaintainer::doFusion(SerialNum serialNum)
_fusion_spec.flush_ids.clear();
}
- uint32_t new_fusion_id = runFusion(spec);
+ uint32_t new_fusion_id = runFusion(spec, std::move(flush_token));
LockGuard lock(_fusion_lock);
if (new_fusion_id == spec.last_fusion_id) { // Error running fusion.
@@ -1000,7 +1000,7 @@ IndexMaintainer::doFusion(SerialNum serialNum)
uint32_t
-IndexMaintainer::runFusion(const FusionSpec &fusion_spec)
+IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token)
{
// Called by a flush engine worker thread
FusionArgs args;
@@ -1020,7 +1020,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec)
serialNum = IndexReadUtilities::readSerialNum(lastFlushDir);
}
FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext());
- uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations);
+ uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, std::move(flush_token));
bool ok = (new_fusion_id != 0);
if (ok) {
ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()),
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
index 5ff805b53f7..d9d2479833f 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -292,8 +292,8 @@ public:
/**
* Runs fusion for any available specs and return the output fusion directory.
*/
- vespalib::string doFusion(SerialNum serialNum);
- uint32_t runFusion(const FusionSpec &fusion_spec);
+ vespalib::string doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token);
+ uint32_t runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token);
void removeOldDiskIndexes();
struct FlushStats {