From de1d36b97e0109229e46f2617432bea6c31a5132 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 6 Jan 2021 13:36:23 +0100 Subject: Wire in use of flush tokens for flush targets. --- searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h | 4 +++- searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp | 5 +++-- searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h | 3 ++- .../vespa/searchcorespi/index/iindexmaintaineroperations.h | 5 ++++- .../src/vespa/searchcorespi/index/indexflushtarget.cpp | 2 +- .../src/vespa/searchcorespi/index/indexflushtarget.h | 2 +- .../src/vespa/searchcorespi/index/indexfusiontarget.cpp | 12 +++++++----- .../src/vespa/searchcorespi/index/indexfusiontarget.h | 2 +- .../src/vespa/searchcorespi/index/indexmaintainer.cpp | 8 ++++---- .../src/vespa/searchcorespi/index/indexmaintainer.h | 4 ++-- 10 files changed, 28 insertions(+), 19 deletions(-) (limited to 'searchcorespi') diff --git a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h index 3a67333c9d5..36a4a320ad4 100644 --- a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h +++ b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h @@ -6,6 +6,8 @@ #include #include +namespace search { class IFlushToken; } + namespace searchcorespi { /** @@ -172,7 +174,7 @@ public: * @param currentSerial The current transaction serial number. * @return The task used to complete the flush. */ - virtual Task::UP initFlush(SerialNum currentSerial) = 0; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr flush_token) = 0; /** * Returns the stats for the last completed flush operation diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp index 841b24af576..f7e818d2d0b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp @@ -84,7 +84,8 @@ writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id, uint32_t FusionRunner::fuse(const FusionSpec &fusion_spec, SerialNum lastSerialNum, - IIndexMaintainerOperations &operations) + IIndexMaintainerOperations &operations, + std::shared_ptr flush_token) { const vector &ids = fusion_spec.flush_ids; if (ids.empty()) { @@ -113,7 +114,7 @@ FusionRunner::fuse(const FusionSpec &fusion_spec, SelectorArray selector_array; readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id); - if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum)) { + if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) { return 0; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h index 44323b06932..ddb23c33dd6 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h @@ -56,7 +56,8 @@ public: **/ uint32_t fuse(const FusionSpec &fusion_spec, search::SerialNum lastSerialNum, - IIndexMaintainerOperations &operations); + IIndexMaintainerOperations &operations, + std::shared_ptr flush_token); }; } // namespace index diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h index 99f17b12b79..49f463c3631 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h @@ -8,6 +8,8 @@ #include #include +namespace search { class IFlushToken; } + namespace searchcorespi::index { /** @@ -52,7 +54,8 @@ struct IIndexMaintainerOperations { const vespalib::string &outputDir, const std::vector &sources, const SelectorArray &selectorArray, - search::SerialNum lastSerialNum) = 0; + search::SerialNum lastSerialNum, + std::shared_ptr flush_token) = 0; }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp index e7bc26b8dd1..a88b5eed414 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -60,7 +60,7 @@ IndexFlushTarget::getFlushedSerialNum() const } IFlushTarget::Task::UP -IndexFlushTarget::initFlush(SerialNum serialNum) +IndexFlushTarget::initFlush(SerialNum serialNum, std::shared_ptr) { // the target must live until this task is done (handled by flush engine). return _indexMaintainer.initFlush(serialNum, &_lastStats); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h index 8769a3ee66d..50ae2000a11 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h @@ -30,7 +30,7 @@ public: virtual bool needUrgentFlush() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; }; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp index b538e817cf8..64f4584e465 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -15,15 +15,17 @@ private: IndexMaintainer &_indexMaintainer; FlushStats &_stats; SerialNum _serialNum; + std::shared_ptr _flush_token; public: - Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum) : + Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum, std::shared_ptr flush_token) : _indexMaintainer(indexMaintainer), _stats(stats), - _serialNum(serialNum) + _serialNum(serialNum), + _flush_token(std::move(flush_token)) {} void run() override { - vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum); + vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum, _flush_token); // the target must live until this task is done (handled by flush engine). _stats.setPath(outputFusionDir); } @@ -86,9 +88,9 @@ IndexFusionTarget::getFlushedSerialNum() const } IFlushTarget::Task::UP -IndexFusionTarget::initFlush(SerialNum serialNum) +IndexFusionTarget::initFlush(SerialNum serialNum, std::shared_ptr flush_token) { - return std::make_unique(_indexMaintainer, _lastStats, serialNum); + return std::make_unique(_indexMaintainer, _lastStats, serialNum, std::move(flush_token)); } uint64_t diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h index 98a61fb12ed..e4a9c803101 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h @@ -26,7 +26,7 @@ public: virtual Time getLastFlushTime() const override; virtual bool needUrgentFlush() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; }; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index c75a74ff141..e2bcb8b7629 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -962,7 +962,7 @@ IndexMaintainer::getFusionSpec() } string -IndexMaintainer::doFusion(SerialNum serialNum) +IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr flush_token) { // Called by a flush engine worker thread @@ -984,7 +984,7 @@ IndexMaintainer::doFusion(SerialNum serialNum) _fusion_spec.flush_ids.clear(); } - uint32_t new_fusion_id = runFusion(spec); + uint32_t new_fusion_id = runFusion(spec, std::move(flush_token)); LockGuard lock(_fusion_lock); if (new_fusion_id == spec.last_fusion_id) { // Error running fusion. @@ -1000,7 +1000,7 @@ IndexMaintainer::doFusion(SerialNum serialNum) uint32_t -IndexMaintainer::runFusion(const FusionSpec &fusion_spec) +IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr flush_token) { // Called by a flush engine worker thread FusionArgs args; @@ -1020,7 +1020,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec) serialNum = IndexReadUtilities::readSerialNum(lastFlushDir); } FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext()); - uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations); + uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, std::move(flush_token)); bool ok = (new_fusion_id != 0); if (ok) { ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()), diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index 5ff805b53f7..d9d2479833f 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -292,8 +292,8 @@ public: /** * Runs fusion for any available specs and return the output fusion directory. */ - vespalib::string doFusion(SerialNum serialNum); - uint32_t runFusion(const FusionSpec &fusion_spec); + vespalib::string doFusion(SerialNum serialNum, std::shared_ptr flush_token); + uint32_t runFusion(const FusionSpec &fusion_spec, std::shared_ptr flush_token); void removeOldDiskIndexes(); struct FlushStats { -- cgit v1.2.3