diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-06 13:36:23 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-06 13:39:43 +0100 |
commit | de1d36b97e0109229e46f2617432bea6c31a5132 (patch) | |
tree | 53248f21c4ccff84bcd2e541b7c2e84eaf49f0c6 /searchcorespi | |
parent | 293ea711b89d760bdea84f22d1b66ff94dad6667 (diff) |
Wire in use of flush tokens for flush targets.
Diffstat (limited to 'searchcorespi')
10 files changed, 28 insertions, 19 deletions
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h index 3a67333c9d5..36a4a320ad4 100644 --- a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h +++ b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h @@ -6,6 +6,8 @@ #include <vespa/vespalib/util/time.h> #include <vector> +namespace search { class IFlushToken; } + namespace searchcorespi { /** @@ -172,7 +174,7 @@ public: * @param currentSerial The current transaction serial number. * @return The task used to complete the flush. */ - virtual Task::UP initFlush(SerialNum currentSerial) = 0; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) = 0; /** * Returns the stats for the last completed flush operation diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp index 841b24af576..f7e818d2d0b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp @@ -84,7 +84,8 @@ writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id, uint32_t FusionRunner::fuse(const FusionSpec &fusion_spec, SerialNum lastSerialNum, - IIndexMaintainerOperations &operations) + IIndexMaintainerOperations &operations, + std::shared_ptr<search::IFlushToken> flush_token) { const vector<uint32_t> &ids = fusion_spec.flush_ids; if (ids.empty()) { @@ -113,7 +114,7 @@ FusionRunner::fuse(const FusionSpec &fusion_spec, SelectorArray selector_array; readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id); - if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum)) { + if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) { return 0; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h index 44323b06932..ddb23c33dd6 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h @@ -56,7 +56,8 @@ public: **/ uint32_t fuse(const FusionSpec &fusion_spec, search::SerialNum lastSerialNum, - IIndexMaintainerOperations &operations); + IIndexMaintainerOperations &operations, + std::shared_ptr<search::IFlushToken> flush_token); }; } // namespace index diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h index 99f17b12b79..49f463c3631 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h @@ -8,6 +8,8 @@ #include <vespa/searchlib/diskindex/docidmapper.h> #include <vespa/searchlib/index/i_field_length_inspector.h> +namespace search { class IFlushToken; } + namespace searchcorespi::index { /** @@ -52,7 +54,8 @@ struct IIndexMaintainerOperations { const vespalib::string &outputDir, const std::vector<vespalib::string> &sources, const SelectorArray &selectorArray, - search::SerialNum lastSerialNum) = 0; + search::SerialNum lastSerialNum, + std::shared_ptr<search::IFlushToken> flush_token) = 0; }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp index e7bc26b8dd1..a88b5eed414 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -60,7 +60,7 @@ IndexFlushTarget::getFlushedSerialNum() const } IFlushTarget::Task::UP -IndexFlushTarget::initFlush(SerialNum serialNum) +IndexFlushTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken>) { // the target must live until this task is done (handled by flush engine). return _indexMaintainer.initFlush(serialNum, &_lastStats); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h index 8769a3ee66d..50ae2000a11 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h @@ -30,7 +30,7 @@ public: virtual bool needUrgentFlush() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; }; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp index b538e817cf8..64f4584e465 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -15,15 +15,17 @@ private: IndexMaintainer &_indexMaintainer; FlushStats &_stats; SerialNum _serialNum; + std::shared_ptr<search::IFlushToken> _flush_token; public: - Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum) : + Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) : _indexMaintainer(indexMaintainer), _stats(stats), - _serialNum(serialNum) + _serialNum(serialNum), + _flush_token(std::move(flush_token)) {} void run() override { - vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum); + vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum, _flush_token); // the target must live until this task is done (handled by flush engine). _stats.setPath(outputFusionDir); } @@ -86,9 +88,9 @@ IndexFusionTarget::getFlushedSerialNum() const } IFlushTarget::Task::UP -IndexFusionTarget::initFlush(SerialNum serialNum) +IndexFusionTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) { - return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum); + return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum, std::move(flush_token)); } uint64_t diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h index 98a61fb12ed..e4a9c803101 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h @@ -26,7 +26,7 @@ public: virtual Time getLastFlushTime() const override; virtual bool needUrgentFlush() const override; - virtual Task::UP initFlush(SerialNum currentSerial) override; + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; }; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index c75a74ff141..e2bcb8b7629 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -962,7 +962,7 @@ IndexMaintainer::getFusionSpec() } string -IndexMaintainer::doFusion(SerialNum serialNum) +IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) { // Called by a flush engine worker thread @@ -984,7 +984,7 @@ IndexMaintainer::doFusion(SerialNum serialNum) _fusion_spec.flush_ids.clear(); } - uint32_t new_fusion_id = runFusion(spec); + uint32_t new_fusion_id = runFusion(spec, std::move(flush_token)); LockGuard lock(_fusion_lock); if (new_fusion_id == spec.last_fusion_id) { // Error running fusion. @@ -1000,7 +1000,7 @@ IndexMaintainer::doFusion(SerialNum serialNum) uint32_t -IndexMaintainer::runFusion(const FusionSpec &fusion_spec) +IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token) { // Called by a flush engine worker thread FusionArgs args; @@ -1020,7 +1020,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec) serialNum = IndexReadUtilities::readSerialNum(lastFlushDir); } FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext()); - uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations); + uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, std::move(flush_token)); bool ok = (new_fusion_id != 0); if (ok) { ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()), diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index 5ff805b53f7..d9d2479833f 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -292,8 +292,8 @@ public: /** * Runs fusion for any available specs and return the output fusion directory. */ - vespalib::string doFusion(SerialNum serialNum); - uint32_t runFusion(const FusionSpec &fusion_spec); + vespalib::string doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token); + uint32_t runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token); void removeOldDiskIndexes(); struct FlushStats { |