diff options
44 files changed, 122 insertions, 100 deletions
diff --git a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp index adf2f4a7d7d..d98de21ec5e 100644 --- a/searchcore/src/tests/proton/attribute/attributeflush_test.cpp +++ b/searchcore/src/tests/proton/attribute/attributeflush_test.cpp @@ -9,6 +9,7 @@ #include <vespa/searchlib/attribute/attributefactory.h> #include <vespa/searchlib/attribute/integerbase.h> #include <vespa/searchlib/common/indexmetainfo.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/test/directory_handler.h> #include <vespa/vespalib/datastore/datastorebase.h> @@ -119,7 +120,7 @@ UpdaterTask::startFlushing(uint64_t syncToken, FlushHandler & handler) handler.gate.reset(new Gate()); IFlushTarget::SP flushable = _am.getFlushable("a1"); LOG(info, "startFlushing(%" PRIu64 ")", syncToken); - handler.doFlushing(flushable->initFlush(syncToken)); + handler.doFlushing(flushable->initFlush(syncToken, std::make_shared<search::FlushToken>())); } @@ -377,13 +378,13 @@ Test::requireThatFlushableAttributeManagesSyncTokenInfo() IndexMetaInfo info("flush/a3"); EXPECT_EQUAL(0u, fa->getFlushedSerialNum()); - EXPECT_TRUE(fa->initFlush(0).get() == NULL); + EXPECT_TRUE(fa->initFlush(0, std::make_shared<search::FlushToken>()).get() == NULL); EXPECT_TRUE(!info.load()); av->commit(10, 10); // last sync token = 10 EXPECT_EQUAL(0u, fa->getFlushedSerialNum()); - EXPECT_TRUE(fa->initFlush(10).get() != NULL); - fa->initFlush(10)->run(); + EXPECT_TRUE(fa->initFlush(10, std::make_shared<search::FlushToken>()).get() != NULL); + fa->initFlush(10, std::make_shared<search::FlushToken>())->run(); EXPECT_EQUAL(10u, fa->getFlushedSerialNum()); EXPECT_TRUE(info.load()); EXPECT_EQUAL(1u, info.snapshots().size()); @@ -392,7 +393,7 @@ Test::requireThatFlushableAttributeManagesSyncTokenInfo() av->commit(20, 20); // last sync token = 20 EXPECT_EQUAL(10u, fa->getFlushedSerialNum()); - fa->initFlush(20)->run(); + fa->initFlush(20, std::make_shared<search::FlushToken>())->run(); EXPECT_EQUAL(20u, fa->getFlushedSerialNum()); EXPECT_TRUE(info.load()); EXPECT_EQUAL(1u, info.snapshots().size()); // snapshot 10 removed @@ -441,7 +442,7 @@ Test::requireThatCleanUpIsPerformedAfterFlush() FlushableAttribute fa(av, diskLayout->getAttributeDir("a6"), TuneFileAttributes(), f._fileHeaderContext, f._attributeFieldWriter, f._hwInfo); - fa.initFlush(30)->run(); + fa.initFlush(30, std::make_shared<search::FlushToken>())->run(); EXPECT_TRUE(info.load()); EXPECT_EQUAL(1u, info.snapshots().size()); // snapshots 10 & 20 removed @@ -462,7 +463,7 @@ Test::requireThatFlushStatsAreUpdated() av->addDocs(1); av->commit(100,100); IFlushTarget::SP ft = am.getFlushable("a7"); - ft->initFlush(101)->run(); + ft->initFlush(101, std::make_shared<search::FlushToken>())->run(); FlushStats stats = ft->getLastFlushStats(); EXPECT_EQUAL("flush/a7/snapshot-101", stats.getPath()); EXPECT_EQUAL(8u, stats.getPathElementsToLog()); @@ -483,7 +484,7 @@ Test::requireThatOnlyOneFlusherCanRunAtTheSameTime() for (size_t i = 10; i < 100; ++i) { av->commit(i, i); - vespalib::Executor::Task::UP task = ft->initFlush(i); + vespalib::Executor::Task::UP task = ft->initFlush(i, std::make_shared<search::FlushToken>()); if (task) { exec.execute(std::move(task)); } @@ -520,7 +521,7 @@ Test::requireThatLastFlushTimeIsReported() AttributeVector::SP av = amf.addAttribute("a9"); IFlushTarget::SP ft = am.getFlushable("a9"); EXPECT_EQUAL(vespalib::system_time(), ft->getLastFlushTime()); - ft->initFlush(200)->run(); + ft->initFlush(200, std::make_shared<search::FlushToken>())->run(); EXPECT_TRUE(FastOS_File::Stat("flush/a9/snapshot-200", &stat)); EXPECT_EQUAL(seconds(stat._modifiedTime), duration_cast<seconds>(ft->getLastFlushTime().time_since_epoch())); } @@ -533,7 +534,7 @@ Test::requireThatLastFlushTimeIsReported() { // updated flush time after nothing to flush std::this_thread::sleep_for(8000ms); std::chrono::seconds now = duration_cast<seconds>(vespalib::system_clock::now().time_since_epoch()); - Executor::Task::UP task = ft->initFlush(200); + Executor::Task::UP task = ft->initFlush(200, std::make_shared<search::FlushToken>()); EXPECT_FALSE(task); EXPECT_LESS(seconds(stat._modifiedTime), ft->getLastFlushTime().time_since_epoch()); EXPECT_APPROX(now.count(), duration_cast<seconds>(ft->getLastFlushTime().time_since_epoch()).count(), 8); @@ -580,7 +581,7 @@ Test::requireThatShrinkWorks() EXPECT_EQUAL(100u, av->getCommittedDocIdLimit()); EXPECT_EQUAL(createSerialNum - 1, ft->getFlushedSerialNum()); vespalib::ThreadStackExecutor exec(1, 128 * 1024); - vespalib::Executor::Task::UP task = ft->initFlush(53); + vespalib::Executor::Task::UP task = ft->initFlush(53, std::make_shared<search::FlushToken>()); exec.execute(std::move(task)); exec.sync(); exec.shutdown(); @@ -611,7 +612,7 @@ Test::requireThatFlushedAttributeCanBeLoaded(const HwInfo &hwInfo) } av->commit(); IFlushTarget::SP ft = am.getFlushable(attrName); - ft->initFlush(200)->run(); + ft->initFlush(200, std::make_shared<search::FlushToken>())->run(); } { AttributeManagerFixture amf(f); @@ -640,7 +641,7 @@ Test::requireThatFlushFailurePreventsSyncTokenUpdate() EXPECT_EQUAL(1u, av->getNumDocs()); auto flush_target = am.getFlushable("a12"); EXPECT_EQUAL(0u, flush_target->getFlushedSerialNum()); - auto flush_task = flush_target->initFlush(200); + auto flush_task = flush_target->initFlush(200, std::make_shared<search::FlushToken>()); // Trigger flush failure av->getEnumStoreBase()->inc_compaction_count(); flush_task->run(); diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp index 326cfba97f4..21b32b40c80 100644 --- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp @@ -11,6 +11,7 @@ #include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h> #include <vespa/searchcore/proton/server/itlssyncer.h> #include <vespa/searchlib/attribute/attributefilesavetarget.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/common/tunefileinfo.h> #include <vespa/searchlib/fef/matchdatalayout.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> @@ -1900,7 +1901,7 @@ TEST(DocumentMetaStoreTest, shrink_via_flush_target_works) ft->getApproxMemoryGain().getAfter()); vespalib::ThreadStackExecutor exec(1, 128 * 1024); - vespalib::Executor::Task::UP task = ft->initFlush(11); + vespalib::Executor::Task::UP task = ft->initFlush(11, std::make_shared<search::FlushToken>()); exec.execute(std::move(task)); exec.sync(); exec.shutdown(); diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index 38fca35ea87..a675a45aa54 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -9,6 +9,7 @@ #include <vespa/searchcore/proton/server/igetserialnum.h> #include <vespa/searchcore/proton/test/dummy_flush_handler.h> #include <vespa/searchcore/proton/test/dummy_flush_target.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/test/insertion_operators.h> #include <vespa/vespalib/testkit/testapp.h> @@ -101,9 +102,9 @@ public: { } - Task::UP initFlush(SerialNum currentSerial) override + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override { - Task::UP task(_target->initFlush(currentSerial)); + Task::UP task(_target->initFlush(currentSerial, std::move(flush_token))); if (task) { return std::make_unique<WrappedFlushTask>(std::move(task), _handler); @@ -287,7 +288,7 @@ public: return _flushedSerial; } - Task::UP initFlush(SerialNum currentSerial) override { + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) override { LOG(info, "SimpleTarget(%s)::initFlush(%" PRIu64 ")", getName().c_str(), currentSerial); _currentSerial = currentSerial; _initDone.countDown(); @@ -639,7 +640,7 @@ TEST("require that threaded target works") auto target = std::make_shared<ThreadedFlushTarget>(executor, getSerialNum, std::make_shared<SimpleTarget>()); EXPECT_FALSE(executor._done.await(SHORT_TIMEOUT)); - EXPECT_TRUE(target->initFlush(0).get() != NULL); + EXPECT_TRUE(target->initFlush(0, std::make_shared<search::FlushToken>()).get() != NULL); EXPECT_TRUE(executor._done.await(LONG_TIMEOUT)); } diff --git a/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp b/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp index 6c502bccce1..acf3f66653d 100644 --- a/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp +++ b/searchcore/src/tests/proton/flushengine/shrink_lid_space_flush_target/shrink_lid_space_flush_target_test.cpp @@ -3,6 +3,7 @@ #include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h> #include <vespa/searchlib/common/i_compactable_lid_space.h> +#include <vespa/searchlib/common/flush_token.h> using namespace proton; using search::SerialNum; @@ -84,7 +85,7 @@ TEST_F("require that flush target returns no estimated memory gain when not able TEST_F("require that flush target returns no estimated memory gain right after shrink", Fixture) { - auto task = f._ft->initFlush(20); + auto task = f._ft->initFlush(20, std::make_shared<search::FlushToken>()); EXPECT_TRUE(validTask(task)); task->run(); auto memoryGain = f._ft->getApproxMemoryGain(); @@ -96,7 +97,7 @@ TEST_F("require that flush target returns no estimated memory gain right after s TEST_F("require that flush target returns no task when not able to flush", Fixture) { f._lidSpace->setCanShrink(false); - auto task = f._ft->initFlush(20); + auto task = f._ft->initFlush(20, std::make_shared<search::FlushToken>()); EXPECT_FALSE(validTask(task)); EXPECT_EQUAL(20u, f._ft->getFlushedSerialNum()); EXPECT_NOT_EQUAL(IFlushTarget::Time(), f._ft->getLastFlushTime()); @@ -105,14 +106,14 @@ TEST_F("require that flush target returns no task when not able to flush", Fixtu TEST_F("require that flush target returns valid task when able to flush again", Fixture) { f._lidSpace->setCanShrink(false); - auto task = f._ft->initFlush(20); + auto task = f._ft->initFlush(20, std::make_shared<search::FlushToken>()); EXPECT_FALSE(validTask(task)); EXPECT_EQUAL(20u, f._ft->getFlushedSerialNum()); EXPECT_NOT_EQUAL(IFlushTarget::Time(), f._ft->getLastFlushTime()); f._lidSpace->setCanShrink(true); auto memoryGain = f._ft->getApproxMemoryGain(); EXPECT_EQUAL(16, memoryGain.gain()); - task = f._ft->initFlush(20); + task = f._ft->initFlush(20, std::make_shared<search::FlushToken>()); EXPECT_TRUE(validTask(task)); task->run(); EXPECT_EQUAL(20u, f._ft->getFlushedSerialNum()); diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp index 6831dd60bd5..acd5c86fd5d 100644 --- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp +++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp @@ -5,6 +5,7 @@ #include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcorespi/index/fusionrunner.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/diskindex/diskindex.h> #include <vespa/searchlib/diskindex/indexbuilder.h> #include <vespa/searchlib/fef/matchdatalayout.h> @@ -252,13 +253,13 @@ void Test::checkResults(uint32_t fusion_id, const uint32_t *ids, size_t size) { } void Test::requireThatNoDiskIndexesGiveId0() { - uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>()); EXPECT_EQUAL(0u, fusion_id); } void Test::requireThatOneDiskIndexCausesCopy() { createIndex(base_dir, disk_id[0]); - uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>()); EXPECT_EQUAL(disk_id[0], fusion_id); set<uint32_t> fusion_ids = readFusionIds(base_dir); ASSERT_TRUE(!fusion_ids.empty()); @@ -271,7 +272,7 @@ void Test::requireThatOneDiskIndexCausesCopy() { void Test::requireThatTwoDiskIndexesCauseFusion() { createIndex(base_dir, disk_id[0]); createIndex(base_dir, disk_id[1]); - uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>()); EXPECT_EQUAL(disk_id[1], fusion_id); set<uint32_t> fusion_ids = readFusionIds(base_dir); ASSERT_TRUE(!fusion_ids.empty()); @@ -286,7 +287,7 @@ void Test::requireThatFusionCanRunOnMultipleDiskIndexes() { createIndex(base_dir, disk_id[1]); createIndex(base_dir, disk_id[2]); createIndex(base_dir, disk_id[3]); - uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>()); EXPECT_EQUAL(disk_id[3], fusion_id); set<uint32_t> fusion_ids = readFusionIds(base_dir); ASSERT_TRUE(!fusion_ids.empty()); @@ -299,7 +300,7 @@ void Test::requireThatFusionCanRunOnMultipleDiskIndexes() { void Test::requireThatOldFusionIndexCanBePartOfNewFusion() { createIndex(base_dir, disk_id[0], true); createIndex(base_dir, disk_id[1]); - uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>()); EXPECT_EQUAL(disk_id[1], fusion_id); set<uint32_t> fusion_ids = readFusionIds(base_dir); ASSERT_TRUE(!fusion_ids.empty()); @@ -313,12 +314,12 @@ void Test::requireThatOldFusionIndexCanBePartOfNewFusion() { void Test::requireThatSelectorsCanBeRebased() { createIndex(base_dir, disk_id[0]); createIndex(base_dir, disk_id[1]); - uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops); + uint32_t fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>()); _fusion_spec.flush_ids.clear(); _fusion_spec.last_fusion_id = fusion_id; createIndex(base_dir, disk_id[2]); - fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops); + fusion_id = _fusion_runner->fuse(_fusion_spec, 0u, _ops, std::make_shared<search::FlushToken>()); checkResults(fusion_id, disk_id, 3); } diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 89594821803..67eb11cee3e 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -8,6 +8,7 @@ #include <vespa/searchcorespi/index/indexflushtarget.h> #include <vespa/searchcorespi/index/indexfusiontarget.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/common/serialnum.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> @@ -278,7 +279,7 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed) IndexFlushTarget target(_index_manager->getMaintainer()); EXPECT_EQ(vespalib::system_time(), target.getLastFlushTime()); vespalib::Executor::Task::UP flushTask; - runAsMaster([&]() { flushTask = target.initFlush(1); }); + runAsMaster([&]() { flushTask = target.initFlush(1, std::make_shared<search::FlushToken>()); }); flushTask->run(); EXPECT_TRUE(FastOS_File::Stat("test_data/index.flush.1", &stat)); EXPECT_EQ(seconds(stat._modifiedTime), duration_cast<seconds>(target.getLastFlushTime().time_since_epoch())); @@ -305,7 +306,7 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed) std::this_thread::sleep_for(8s); std::chrono::seconds now = duration_cast<seconds>(vespalib::system_clock::now().time_since_epoch()); vespalib::Executor::Task::UP task; - runAsMaster([&]() { task = target.initFlush(2); }); + runAsMaster([&]() { task = target.initFlush(2, std::make_shared<search::FlushToken>()); }); EXPECT_FALSE(task); EXPECT_EQ(2u, target.getFlushedSerialNum()); EXPECT_LT(seconds(stat._modifiedTime), duration_cast<seconds>(target.getLastFlushTime().time_since_epoch())); @@ -480,7 +481,7 @@ TEST_F(IndexManagerTest, require_that_fusion_updates_indexes) FusionSpec fusion_spec; fusion_spec.flush_ids.assign(ids, ids + 4); - _index_manager->getMaintainer().runFusion(fusion_spec); + _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>()); set<uint32_t> fusion_ids = readDiskIds(index_dir, "fusion"); EXPECT_EQ(1u, fusion_ids.size()); @@ -502,7 +503,7 @@ TEST_F(IndexManagerTest, require_that_flush_triggers_fusion) flushIndexManager(); } IFlushTarget::SP target(new IndexFusionTarget(_index_manager->getMaintainer())); - target->initFlush(0)->run(); + target->initFlush(0, std::make_shared<search::FlushToken>())->run(); addDocument(docid); flushIndexManager(); set<uint32_t> fusion_ids = readDiskIds(index_dir, "fusion"); @@ -549,7 +550,7 @@ TEST_F(IndexManagerTest, require_that_fusion_cleans_up_old_indexes) FusionSpec fusion_spec; fusion_spec.flush_ids.push_back(1); fusion_spec.flush_ids.push_back(2); - _index_manager->getMaintainer().runFusion(fusion_spec); + _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>()); flush_ids = readDiskIds(index_dir, "flush"); EXPECT_EQ(1u, flush_ids.size()); @@ -597,7 +598,7 @@ TEST_F(IndexManagerTest, require_that_disk_indexes_are_loaded_on_startup) FusionSpec fusion_spec; fusion_spec.flush_ids.push_back(1); fusion_spec.flush_ids.push_back(2); - _index_manager->getMaintainer().runFusion(fusion_spec); + _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>()); _index_manager.reset(0); ASSERT_TRUE(!indexExists("flush", 1)); @@ -644,7 +645,7 @@ TEST_F(IndexManagerTest, require_that_existing_indexes_are_to_be_fusioned_on_sta resetIndexManager(); IFlushTarget::SP target(new IndexFusionTarget(_index_manager->getMaintainer())); - target->initFlush(0)->run(); + target->initFlush(0, std::make_shared<search::FlushToken>())->run(); addDocument(docid); flushIndexManager(); @@ -670,7 +671,7 @@ TEST_F(IndexManagerTest, require_that_serial_number_is_copied_on_fusion) FusionSpec fusion_spec; fusion_spec.flush_ids.push_back(1); fusion_spec.flush_ids.push_back(2); - _index_manager->getMaintainer().runFusion(fusion_spec); + _index_manager->getMaintainer().runFusion(fusion_spec, std::make_shared<search::FlushToken>()); FastOS_File file((index_dir + "/index.fusion.2/serial.dat").c_str()); EXPECT_TRUE(file.OpenReadOnly()); } @@ -711,7 +712,7 @@ TEST_F(IndexManagerTest, require_that_failed_fusion_is_retried) crippleFusion(2); IndexFusionTarget target(_index_manager->getMaintainer()); - vespalib::Executor::Task::UP fusionTask = target.initFlush(1); + vespalib::Executor::Task::UP fusionTask = target.initFlush(1, std::make_shared<search::FlushToken>()); fusionTask->run(); FusionSpec spec = _index_manager->getMaintainer().getFusionSpec(); diff --git a/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp b/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp index 4ed01f8caf7..2238e3c4f0c 100644 --- a/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp +++ b/searchcore/src/tests/proton/metrics/job_tracked_flush/job_tracked_flush_test.cpp @@ -6,6 +6,7 @@ LOG_SETUP("job_tracked_flush_test"); #include <vespa/searchcore/proton/metrics/job_tracked_flush_task.h> #include <vespa/searchcore/proton/test/dummy_flush_target.h> #include <vespa/searchcore/proton/test/simple_job_tracker.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/threadstackexecutor.h> @@ -46,7 +47,7 @@ struct MyFlushTarget : public test::DummyFlushTarget {} // Implements searchcorespi::IFlushTarget - virtual FlushTask::UP initFlush(SerialNum currentSerial) override { + virtual FlushTask::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) override { if (currentSerial > 0) { _initFlushSerial = currentSerial; _initGate.await(5000); @@ -74,7 +75,7 @@ struct Fixture { } void initFlush(SerialNum currentSerial) { - _task = _trackedFlush.initFlush(currentSerial); + _task = _trackedFlush.initFlush(currentSerial, std::make_shared<search::FlushToken>()); _taskGate.countDown(); } }; @@ -130,7 +131,7 @@ TEST_F("require that flush task execution is tracked", Fixture(2)) TEST_F("require that nullptr flush task is not tracked", Fixture) { - FlushTask::UP task = f._trackedFlush.initFlush(0); + FlushTask::UP task = f._trackedFlush.initFlush(0, std::make_shared<search::FlushToken>()); EXPECT_TRUE(task.get() == nullptr); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp index af7bae32b11..6399c52696c 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp @@ -3,6 +3,7 @@ #include "attribute_populator.h" #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/gate.h> #include <vespa/searchlib/attribute/attributevector.h> @@ -88,7 +89,7 @@ AttributePopulator::done() auto flushTargets = mgr->getFlushTargets(); for (const auto &flushTarget : flushTargets) { assert(flushTarget->getFlushedSerialNum() < _configSerialNum); - auto task = flushTarget->initFlush(_configSerialNum); + auto task = flushTarget->initFlush(_configSerialNum, std::make_shared<search::FlushToken>()); // shrink target only return task if able to shrink. if (task) { task->run(); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp index 504f6841daf..cc1af923fc0 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp @@ -13,6 +13,7 @@ #include <vespa/searchlib/attribute/attributecontext.h> #include <vespa/searchlib/attribute/attribute_read_guard.h> #include <vespa/searchlib/attribute/imported_attribute_vector.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchcommon/attribute/i_attribute_functor.h> #include <vespa/searchlib/attribute/interlock.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> @@ -326,7 +327,7 @@ AttributeManager::flushAll(SerialNum currentSerial) auto flushTargets = getFlushTargets(); for (const auto &ft : flushTargets) { vespalib::Executor::Task::UP task; - task = ft->initFlush(currentSerial); + task = ft->initFlush(currentSerial, std::make_shared<search::FlushToken>()); if (task.get() != NULL) { task->run(); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp index 4edb64b861a..9bff8e2a438 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp @@ -225,7 +225,7 @@ FlushableAttribute::internalInitFlush(SerialNum currentSerial) IFlushTarget::Task::UP -FlushableAttribute::initFlush(SerialNum currentSerial) +FlushableAttribute::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) { // Called by document db executor std::promise<IFlushTarget::Task::UP> promise; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h index a759bcce26e..c6654392372 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h @@ -69,7 +69,7 @@ public: virtual DiskGain getApproxDiskGain() const override; virtual Time getLastFlushTime() const override; virtual SerialNum getFlushedSerialNum() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; virtual double get_replay_operation_cost() const override; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp index 79f32d60056..6bda77f97a0 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.cpp @@ -74,7 +74,7 @@ SummaryCompactTarget::getFlushedSerialNum() const } IFlushTarget::Task::UP -SummaryCompactTarget::initFlush(SerialNum currentSerial) +SummaryCompactTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) { std::promise<Task::UP> promise; std::future<Task::UP> future = promise.get_future(); diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h index 5dcfb2b9c10..8efd4c0d3bf 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarycompacttarget.h @@ -28,7 +28,7 @@ public: virtual SerialNum getFlushedSerialNum() const override; virtual Time getLastFlushTime() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp index a101ad4d83c..8103a28db5f 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.cpp @@ -85,7 +85,7 @@ SummaryFlushTarget::internalInitFlush(SerialNum currentSerial) { return std::make_unique<Flusher>(_docStore, _lastStats, currentSerial); } IFlushTarget::Task::UP -SummaryFlushTarget::initFlush(SerialNum currentSerial) +SummaryFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) { // Called by document db executor std::promise<Task::UP> promise; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h index 7e450649a52..4912d99f7f7 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summaryflushtarget.h @@ -29,7 +29,7 @@ public: virtual SerialNum getFlushedSerialNum() const override; virtual Time getLastFlushTime() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp index 6abbf477959..60853f765ea 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp @@ -53,7 +53,7 @@ public: searchcorespi::index::IThreadService & summaryService, std::shared_ptr<ICompactableLidSpace> target); ~ShrinkSummaryLidSpaceFlushTarget() override; - Task::UP initFlush(SerialNum currentSerial) override; + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; }; ShrinkSummaryLidSpaceFlushTarget:: @@ -69,11 +69,11 @@ ShrinkSummaryLidSpaceFlushTarget(const vespalib::string &name, Type type, Compon ShrinkSummaryLidSpaceFlushTarget::~ShrinkSummaryLidSpaceFlushTarget() = default; IFlushTarget::Task::UP -ShrinkSummaryLidSpaceFlushTarget::initFlush(SerialNum currentSerial) +ShrinkSummaryLidSpaceFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) { std::promise<Task::UP> promise; std::future<Task::UP> future = promise.get_future(); - _summaryService.execute(makeLambdaTask([&]() { promise.set_value(ShrinkLidSpaceFlushTarget::initFlush(currentSerial)); })); + _summaryService.execute(makeLambdaTask([&]() { promise.set_value(ShrinkLidSpaceFlushTarget::initFlush(currentSerial, flush_token)); })); return future.get(); } diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp index ad3a56b3d56..dcfa7b13970 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.cpp @@ -211,7 +211,7 @@ DocumentMetaStoreFlushTarget::getLastFlushTime() const IFlushTarget::Task::UP -DocumentMetaStoreFlushTarget::initFlush(SerialNum currentSerial) +DocumentMetaStoreFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) { // Called by document db executor _dms->removeAllOldGenerations(); diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h index 5cc674d3614..c670279e3fa 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/documentmetastoreflushtarget.h @@ -58,7 +58,7 @@ public: DiskGain getApproxDiskGain() const override; Time getLastFlushTime() const override; SerialNum getFlushedSerialNum() const override; - Task::UP initFlush(SerialNum currentSerial) override; + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; FlushStats getLastFlushStats() const override { return _lastStats; } static void initCleanup(const vespalib::string &baseDir); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h index e2bf9783710..ecd66f9cbfb 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/cachedflushtarget.h @@ -48,7 +48,7 @@ public: virtual Time getLastFlushTime() const override { return _lastFlushTime; } virtual bool needUrgentFlush() const override { return _needUrgentFlush; } - virtual Task::UP initFlush(SerialNum currentSerial) override { return _target->initFlush(currentSerial); } + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override { return _target->initFlush(currentSerial, std::move(flush_token)); } virtual FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } virtual uint64_t getApproxBytesToWriteToDisk() const override; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp index ebbe2171e63..b569e499876 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.cpp @@ -34,10 +34,10 @@ FlushContext::~FlushContext() } bool -FlushContext::initFlush() +FlushContext::initFlush(std::shared_ptr<search::IFlushToken> flush_token) { LOG(debug, "Attempting to flush '%s'.", _name.c_str()); - _task = _target->initFlush(std::max(_handler->getCurrentSerialNumber(), _lastSerial)); + _task = _target->initFlush(std::max(_handler->getCurrentSerialNumber(), _lastSerial), std::move(flush_token)); if (_task.get() == NULL) { LOG(debug, "Target refused to init flush."); return false; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h index 7ce8210db2f..2c380cc28ed 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushcontext.h @@ -54,9 +54,9 @@ public: * signature. If this method returns true, the task to complete the flush is * available through getTask(). * - * @param True if a flush was initiated. + * @return True if a flush was initiated. */ - bool initFlush(); + bool initFlush(std::shared_ptr<search::IFlushToken> flush_token); /** * Returns the unique name of this context. This is the concatenation of the diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index a177aa608e7..5f35ecc916d 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -7,6 +7,7 @@ #include "tls_stats_map.h" #include "tls_stats_factory.h" #include <vespa/searchcore/proton/common/eventlogger.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/vespalib/util/jsonwriter.h> #include <thread> @@ -276,7 +277,7 @@ FlushEngine::initNextFlush(const FlushContext::List &lst) if (LOG_WOULD_LOG(event)) { EventLogger::flushInit(it->getName()); } - if (it->initFlush()) { + if (it->initFlush(std::make_shared<search::FlushToken>())) { ctx = it; break; } @@ -293,7 +294,7 @@ FlushEngine::flushAll(const FlushContext::List &lst) LOG(debug, "%ld targets to flush.", lst.size()); for (const FlushContext::SP & ctx : lst) { if (wait(0)) { - if (ctx->initFlush()) { + if (ctx->initFlush(std::make_shared<search::FlushToken>())) { logTarget("initiated", *ctx); _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); } else { diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp index d51d5a113cb..f264555be1a 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.cpp @@ -59,9 +59,9 @@ FlushTargetProxy::needUrgentFlush() const IFlushTarget::Task::UP -FlushTargetProxy::initFlush(SerialNum currentSerial) +FlushTargetProxy::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) { - return _target->initFlush(currentSerial); + return _target->initFlush(currentSerial, std::move(flush_token)); } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h index 2b50bf2e222..84bf91a2f9a 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushtargetproxy.h @@ -62,7 +62,7 @@ public: needUrgentFlush() const override; virtual Task::UP - initFlush(SerialNum currentSerial) override; + initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual searchcorespi::FlushStats getLastFlushStats() const override; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp index e9c2554f13c..45fcd1d19aa 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.cpp @@ -87,7 +87,7 @@ ShrinkLidSpaceFlushTarget::needUrgentFlush() const } IFlushTarget::Task::UP -ShrinkLidSpaceFlushTarget::initFlush(SerialNum currentSerial) +ShrinkLidSpaceFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken>) { if (currentSerial < _flushedSerialNum) { _lastFlushTime = vespalib::system_clock::now(); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h index c1ecb2c2b4a..ebd8a54004b 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h @@ -47,7 +47,7 @@ public: SerialNum getFlushedSerialNum() const override; Time getLastFlushTime() const override; bool needUrgentFlush() const override; - Task::UP initFlush(SerialNum currentSerial) override; + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; searchcorespi::FlushStats getLastFlushStats() const override; uint64_t getApproxBytesToWriteToDisk() const override; }; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp index 3c1ac1b5361..59a5504b57b 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.cpp @@ -34,25 +34,26 @@ ThreadedFlushTarget::ThreadedFlushTarget(vespalib::Executor &executor, namespace { IFlushTarget::Task::UP callInitFlush(IFlushTarget *target, IFlushTarget::SerialNum serial, - const IGetSerialNum *getSerialNum) { + const IGetSerialNum *getSerialNum, std::shared_ptr<search::IFlushToken> flush_token) { // Serial number from flush engine might have become stale, obtain // a fresh serial number now. (void) serial; search::SerialNum freshSerial = getSerialNum->getSerialNum(); assert(freshSerial >= serial); - return target->initFlush(freshSerial); + return target->initFlush(freshSerial, std::move(flush_token)); } } // namespace IFlushTarget::Task::UP -ThreadedFlushTarget::initFlush(SerialNum currentSerial) +ThreadedFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) { std::promise<Task::UP> promise; std::future<Task::UP> future = promise.get_future(); _executor.execute(makeLambdaTask([&]() { promise.set_value(callInitFlush(_target.get(), currentSerial, - &_getSerialNum)); })); + &_getSerialNum, + flush_token)); })); return future.get(); } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h index 26cdb6903d3..cd2ee11265f 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/threadedflushtarget.h @@ -50,7 +50,7 @@ public: // Implements IFlushTarget. virtual Task::UP - initFlush(SerialNum currentSerial) override; + initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp index d93f02c7994..69790b3dceb 100644 --- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp @@ -3,7 +3,6 @@ #include "indexmanager.h" #include "diskindexwrapper.h" #include "memoryindexwrapper.h" -#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/common/serialnumfileheadercontext.h> #include <vespa/searchlib/diskindex/fusion.h> @@ -12,7 +11,7 @@ using search::common::FileHeaderContext; using search::common::SerialNumFileHeaderContext; using search::index::Schema; using search::index::SchemaUtil; -using search::FlushToken; +using search::IFlushToken; using search::TuneFileIndexing; using search::TuneFileIndexManager; using search::TuneFileSearch; @@ -65,12 +64,13 @@ IndexManager::MaintainerOperations::runFusion(const Schema &schema, const vespalib::string &outputDir, const std::vector<vespalib::string> &sources, const SelectorArray &selectorArray, - SerialNum serialNum) + SerialNum serialNum, + std::shared_ptr<IFlushToken> flush_token) { SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, serialNum); const bool dynamic_k_doc_pos_occ_format = false; return Fusion::merge(schema, outputDir, sources, selectorArray, dynamic_k_doc_pos_occ_format, - _tuneFileIndexing, fileHeaderContext, _threadingService.shared(), std::make_shared<FlushToken>()); + _tuneFileIndexing, fileHeaderContext, _threadingService.shared(), std::move(flush_token)); } diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h index 7e305681f78..7cf25292d1c 100644 --- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h +++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h @@ -56,7 +56,8 @@ public: bool runFusion(const Schema &schema, const vespalib::string &outputDir, const std::vector<vespalib::string> &sources, const SelectorArray &docIdSelector, - search::SerialNum lastSerialNum) override; + search::SerialNum lastSerialNum, + std::shared_ptr<search::IFlushToken> flush_token) override; }; private: diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp index 14d76645fc7..cb75472a748 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp @@ -19,10 +19,10 @@ JobTrackedFlushTarget::JobTrackedFlushTarget(const IJobTracker::SP &tracker, JobTrackedFlushTarget::~JobTrackedFlushTarget() {} FlushTask::UP -JobTrackedFlushTarget::initFlush(SerialNum currentSerial) +JobTrackedFlushTarget::initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) { _tracker->start(); - FlushTask::UP targetTask = _target->initFlush(currentSerial); + FlushTask::UP targetTask = _target->initFlush(currentSerial, std::move(flush_token)); _tracker->end(); if (targetTask.get() != nullptr) { return std::make_unique<JobTrackedFlushTask>(_tracker, std::move(targetTask)); diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h index fdfe6986ec3..053f6b75ddd 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h @@ -40,7 +40,7 @@ public: virtual bool needUrgentFlush() const override { return _target->needUrgentFlush(); } - virtual searchcorespi::FlushTask::UP initFlush(SerialNum currentSerial) override; + virtual searchcorespi::FlushTask::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual searchcorespi::FlushStats getLastFlushStats() const override { return _target->getLastFlushStats(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp index 0bd50fc0104..7dbf54cfd6c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp @@ -19,6 +19,7 @@ #include <vespa/searchcore/proton/matching/sessionmanager.h> #include <vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h> #include <vespa/searchlib/attribute/configconverter.h> +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/docstore/document_store_visitor_progress.h> #include <vespa/searchlib/util/fileheadertk.h> #include <vespa/vespalib/io/fileutil.h> @@ -482,7 +483,7 @@ StoreOnlyDocSubDB::close() assert(_writeService.master().isCurrentThread()); search::IDocumentStore & store(_rSummaryMgr->getBackingStore()); auto summaryFlush = std::make_shared<SummaryFlushTarget>(store, _writeService.summary()); - auto summaryFlushTask = summaryFlush->initFlush(store.tentativeLastSyncToken()); + auto summaryFlushTask = summaryFlush->initFlush(store.tentativeLastSyncToken(), std::make_shared<search::FlushToken>()); if (summaryFlushTask) { SerialNum syncToken = summaryFlushTask->getFlushSerial(); _tlSyncer.sync(syncToken); diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h index dd3cd49df2f..4178780da98 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h @@ -20,7 +20,7 @@ struct DummyFlushTarget : public searchcorespi::IFlushTarget SerialNum getFlushedSerialNum() const override { return 0; } Time getLastFlushTime() const override { return Time(); } bool needUrgentFlush() const override { return false; } - searchcorespi::FlushTask::UP initFlush(SerialNum) override { + searchcorespi::FlushTask::UP initFlush(SerialNum, std::shared_ptr<search::IFlushToken>) override { return searchcorespi::FlushTask::UP(); } searchcorespi::FlushStats getLastFlushStats() const override { diff --git a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h index 3a67333c9d5..36a4a320ad4 100644 --- a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h +++ b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h @@ -6,6 +6,8 @@ #include <vespa/vespalib/util/time.h> #include <vector> +namespace search { class IFlushToken; } + namespace searchcorespi { /** @@ -172,7 +174,7 @@ public: * @param currentSerial The current transaction serial number. * @return The task used to complete the flush. */ - virtual Task::UP initFlush(SerialNum currentSerial) = 0; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) = 0; /** * Returns the stats for the last completed flush operation diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp index 841b24af576..f7e818d2d0b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp @@ -84,7 +84,8 @@ writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id, uint32_t FusionRunner::fuse(const FusionSpec &fusion_spec, SerialNum lastSerialNum, - IIndexMaintainerOperations &operations) + IIndexMaintainerOperations &operations, + std::shared_ptr<search::IFlushToken> flush_token) { const vector<uint32_t> &ids = fusion_spec.flush_ids; if (ids.empty()) { @@ -113,7 +114,7 @@ FusionRunner::fuse(const FusionSpec &fusion_spec, SelectorArray selector_array; readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id); - if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum)) { + if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) { return 0; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h index 44323b06932..ddb23c33dd6 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h @@ -56,7 +56,8 @@ public: **/ uint32_t fuse(const FusionSpec &fusion_spec, search::SerialNum lastSerialNum, - IIndexMaintainerOperations &operations); + IIndexMaintainerOperations &operations, + std::shared_ptr<search::IFlushToken> flush_token); }; } // namespace index diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h index 99f17b12b79..49f463c3631 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h @@ -8,6 +8,8 @@ #include <vespa/searchlib/diskindex/docidmapper.h> #include <vespa/searchlib/index/i_field_length_inspector.h> +namespace search { class IFlushToken; } + namespace searchcorespi::index { /** @@ -52,7 +54,8 @@ struct IIndexMaintainerOperations { const vespalib::string &outputDir, const std::vector<vespalib::string> &sources, const SelectorArray &selectorArray, - search::SerialNum lastSerialNum) = 0; + search::SerialNum lastSerialNum, + std::shared_ptr<search::IFlushToken> flush_token) = 0; }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp index e7bc26b8dd1..a88b5eed414 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -60,7 +60,7 @@ IndexFlushTarget::getFlushedSerialNum() const } IFlushTarget::Task::UP -IndexFlushTarget::initFlush(SerialNum serialNum) +IndexFlushTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken>) { // the target must live until this task is done (handled by flush engine). return _indexMaintainer.initFlush(serialNum, &_lastStats); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h index 8769a3ee66d..50ae2000a11 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h @@ -30,7 +30,7 @@ public: virtual bool needUrgentFlush() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; }; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp index b538e817cf8..64f4584e465 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -15,15 +15,17 @@ private: IndexMaintainer &_indexMaintainer; FlushStats &_stats; SerialNum _serialNum; + std::shared_ptr<search::IFlushToken> _flush_token; public: - Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum) : + Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) : _indexMaintainer(indexMaintainer), _stats(stats), - _serialNum(serialNum) + _serialNum(serialNum), + _flush_token(std::move(flush_token)) {} void run() override { - vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum); + vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum, _flush_token); // the target must live until this task is done (handled by flush engine). _stats.setPath(outputFusionDir); } @@ -86,9 +88,9 @@ IndexFusionTarget::getFlushedSerialNum() const } IFlushTarget::Task::UP -IndexFusionTarget::initFlush(SerialNum serialNum) +IndexFusionTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) { - return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum); + return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum, std::move(flush_token)); } uint64_t diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h index 98a61fb12ed..e4a9c803101 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h @@ -26,7 +26,7 @@ public: virtual Time getLastFlushTime() const override; virtual bool needUrgentFlush() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; }; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index c75a74ff141..e2bcb8b7629 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -962,7 +962,7 @@ IndexMaintainer::getFusionSpec() } string -IndexMaintainer::doFusion(SerialNum serialNum) +IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) { // Called by a flush engine worker thread @@ -984,7 +984,7 @@ IndexMaintainer::doFusion(SerialNum serialNum) _fusion_spec.flush_ids.clear(); } - uint32_t new_fusion_id = runFusion(spec); + uint32_t new_fusion_id = runFusion(spec, std::move(flush_token)); LockGuard lock(_fusion_lock); if (new_fusion_id == spec.last_fusion_id) { // Error running fusion. @@ -1000,7 +1000,7 @@ IndexMaintainer::doFusion(SerialNum serialNum) uint32_t -IndexMaintainer::runFusion(const FusionSpec &fusion_spec) +IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token) { // Called by a flush engine worker thread FusionArgs args; @@ -1020,7 +1020,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec) serialNum = IndexReadUtilities::readSerialNum(lastFlushDir); } FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext()); - uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations); + uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, std::move(flush_token)); bool ok = (new_fusion_id != 0); if (ok) { ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()), diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index 5ff805b53f7..d9d2479833f 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -292,8 +292,8 @@ public: /** * Runs fusion for any available specs and return the output fusion directory. */ - vespalib::string doFusion(SerialNum serialNum); - uint32_t runFusion(const FusionSpec &fusion_spec); + vespalib::string doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token); + uint32_t runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token); void removeOldDiskIndexes(); struct FlushStats { |