aboutsummaryrefslogtreecommitdiffstats
path: root/searchcorespi
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2021-01-06 13:36:23 +0100
committerTor Egge <Tor.Egge@broadpark.no>2021-01-06 13:39:43 +0100
commitde1d36b97e0109229e46f2617432bea6c31a5132 (patch)
tree53248f21c4ccff84bcd2e541b7c2e84eaf49f0c6 /searchcorespi
parent293ea711b89d760bdea84f22d1b66ff94dad6667 (diff)
Wire in use of flush tokens for flush targets.
Diffstat (limited to 'searchcorespi')
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h4
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp5
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h3
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h5
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp12
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp8
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h4
10 files changed, 28 insertions, 19 deletions
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
index 3a67333c9d5..36a4a320ad4 100644
--- a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
+++ b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
@@ -6,6 +6,8 @@
#include <vespa/vespalib/util/time.h>
#include <vector>
+namespace search { class IFlushToken; }
+
namespace searchcorespi {
/**
@@ -172,7 +174,7 @@ public:
* @param currentSerial The current transaction serial number.
* @return The task used to complete the flush.
*/
- virtual Task::UP initFlush(SerialNum currentSerial) = 0;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) = 0;
/**
* Returns the stats for the last completed flush operation
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
index 841b24af576..f7e818d2d0b 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
@@ -84,7 +84,8 @@ writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id,
uint32_t
FusionRunner::fuse(const FusionSpec &fusion_spec,
SerialNum lastSerialNum,
- IIndexMaintainerOperations &operations)
+ IIndexMaintainerOperations &operations,
+ std::shared_ptr<search::IFlushToken> flush_token)
{
const vector<uint32_t> &ids = fusion_spec.flush_ids;
if (ids.empty()) {
@@ -113,7 +114,7 @@ FusionRunner::fuse(const FusionSpec &fusion_spec,
SelectorArray selector_array;
readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id);
- if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum)) {
+ if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) {
return 0;
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
index 44323b06932..ddb23c33dd6 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
@@ -56,7 +56,8 @@ public:
**/
uint32_t fuse(const FusionSpec &fusion_spec,
search::SerialNum lastSerialNum,
- IIndexMaintainerOperations &operations);
+ IIndexMaintainerOperations &operations,
+ std::shared_ptr<search::IFlushToken> flush_token);
};
} // namespace index
diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
index 99f17b12b79..49f463c3631 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
@@ -8,6 +8,8 @@
#include <vespa/searchlib/diskindex/docidmapper.h>
#include <vespa/searchlib/index/i_field_length_inspector.h>
+namespace search { class IFlushToken; }
+
namespace searchcorespi::index {
/**
@@ -52,7 +54,8 @@ struct IIndexMaintainerOperations {
const vespalib::string &outputDir,
const std::vector<vespalib::string> &sources,
const SelectorArray &selectorArray,
- search::SerialNum lastSerialNum) = 0;
+ search::SerialNum lastSerialNum,
+ std::shared_ptr<search::IFlushToken> flush_token) = 0;
};
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
index e7bc26b8dd1..a88b5eed414 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
@@ -60,7 +60,7 @@ IndexFlushTarget::getFlushedSerialNum() const
}
IFlushTarget::Task::UP
-IndexFlushTarget::initFlush(SerialNum serialNum)
+IndexFlushTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken>)
{
// the target must live until this task is done (handled by flush engine).
return _indexMaintainer.initFlush(serialNum, &_lastStats);
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
index 8769a3ee66d..50ae2000a11 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
@@ -30,7 +30,7 @@ public:
virtual bool needUrgentFlush() const override;
- virtual Task::UP initFlush(SerialNum currentSerial) override;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual FlushStats getLastFlushStats() const override { return _lastStats; }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
};
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
index b538e817cf8..64f4584e465 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
@@ -15,15 +15,17 @@ private:
IndexMaintainer &_indexMaintainer;
FlushStats &_stats;
SerialNum _serialNum;
+ std::shared_ptr<search::IFlushToken> _flush_token;
public:
- Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum) :
+ Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) :
_indexMaintainer(indexMaintainer),
_stats(stats),
- _serialNum(serialNum)
+ _serialNum(serialNum),
+ _flush_token(std::move(flush_token))
{}
void run() override {
- vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum);
+ vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum, _flush_token);
// the target must live until this task is done (handled by flush engine).
_stats.setPath(outputFusionDir);
}
@@ -86,9 +88,9 @@ IndexFusionTarget::getFlushedSerialNum() const
}
IFlushTarget::Task::UP
-IndexFusionTarget::initFlush(SerialNum serialNum)
+IndexFusionTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token)
{
- return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum);
+ return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum, std::move(flush_token));
}
uint64_t
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
index 98a61fb12ed..e4a9c803101 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
@@ -26,7 +26,7 @@ public:
virtual Time getLastFlushTime() const override;
virtual bool needUrgentFlush() const override;
- virtual Task::UP initFlush(SerialNum currentSerial) override;
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
virtual FlushStats getLastFlushStats() const override { return _lastStats; }
virtual uint64_t getApproxBytesToWriteToDisk() const override;
};
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index c75a74ff141..e2bcb8b7629 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -962,7 +962,7 @@ IndexMaintainer::getFusionSpec()
}
string
-IndexMaintainer::doFusion(SerialNum serialNum)
+IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token)
{
// Called by a flush engine worker thread
@@ -984,7 +984,7 @@ IndexMaintainer::doFusion(SerialNum serialNum)
_fusion_spec.flush_ids.clear();
}
- uint32_t new_fusion_id = runFusion(spec);
+ uint32_t new_fusion_id = runFusion(spec, std::move(flush_token));
LockGuard lock(_fusion_lock);
if (new_fusion_id == spec.last_fusion_id) { // Error running fusion.
@@ -1000,7 +1000,7 @@ IndexMaintainer::doFusion(SerialNum serialNum)
uint32_t
-IndexMaintainer::runFusion(const FusionSpec &fusion_spec)
+IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token)
{
// Called by a flush engine worker thread
FusionArgs args;
@@ -1020,7 +1020,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec)
serialNum = IndexReadUtilities::readSerialNum(lastFlushDir);
}
FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext());
- uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations);
+ uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, std::move(flush_token));
bool ok = (new_fusion_id != 0);
if (ok) {
ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()),
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
index 5ff805b53f7..d9d2479833f 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -292,8 +292,8 @@ public:
/**
* Runs fusion for any available specs and return the output fusion directory.
*/
- vespalib::string doFusion(SerialNum serialNum);
- uint32_t runFusion(const FusionSpec &fusion_spec);
+ vespalib::string doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token);
+ uint32_t runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token);
void removeOldDiskIndexes();
struct FlushStats {