aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-03-09 15:18:04 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-03-09 15:18:04 +0000
commit9d05cfd466959c68b9b633b876016678c253fed6 (patch)
tree2cc8155f2c83cd76024cbd92b6c421b53af9abe0
parent7743992d1dd55b635a297ad97546d4fbfd5ce23f (diff)
Make current serial number in IndexMaintainer atomic
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp30
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h27
2 files changed, 32 insertions, 25 deletions
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index 20aba93d4d7..2e53db3bd87 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -490,7 +490,7 @@ IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index)
args->old_absolute_id = _current_index_id + _last_fusion_id;
args->old_source_list = _source_list;
string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id));
- args->flush_serial_num = _current_serial_num;
+ args->flush_serial_num = current_serial_num();
{
LockGuard lock(_index_update_lock);
// Handover of extra memory indexes to flush
@@ -795,7 +795,7 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex)
args._oldSourceList = _source_list; // Delay destruction
uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id;
string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId));
- SerialNum freezeSerialNum = _current_serial_num;
+ SerialNum freezeSerialNum = current_serial_num();
bool dropEmptyLast = false;
SaveInfo::UP saveInfo;
@@ -921,7 +921,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
_flush_serial_num = IndexReadUtilities::readSerialNum(latest_index_dir);
_lastFlushTime = search::FileKit::getModificationTime(latest_index_dir);
- _current_serial_num = _flush_serial_num;
+ set_current_serial_num(_flush_serial_num);
const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir);
_selector = FixedSourceSelector::load(selector, _next_id - 1);
} else {
@@ -941,7 +941,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
_selector->setDefaultSource(_current_index_id);
auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector));
- _current_index = operations.createMemoryIndex(_schema, *sourceList, _current_serial_num);
+ _current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num());
LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num);
sourceList->append(_current_index_id, _current_index);
sourceList->setCurrentIndex(_current_index_id);
@@ -966,10 +966,10 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat
assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits
{
LockGuard lock(_index_update_lock);
- _current_serial_num = std::max(_current_serial_num, serialNum);
+ set_current_serial_num(std::max(current_serial_num(), serialNum));
}
- IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, _current_serial_num));
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num()));
FlushArgs args;
args.stats = stats;
// Ensure that all index thread tasks accessing memory index have completed.
@@ -984,7 +984,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat
if (args._skippedEmptyLast && args._extraIndexes.empty()) {
// No memory index to flush, it was empty
LockGuard lock(_state_lock);
- _flush_serial_num = _current_serial_num;
+ _flush_serial_num = current_serial_num();
_lastFlushTime = vespalib::system_clock::now();
LOG(debug, "No memory index to flush. Update serial number and flush time to current: "
"flushSerialNum(%" PRIu64 "), lastFlushTime(%f)",
@@ -1014,7 +1014,7 @@ IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushTok
// XXX: Claims to have flushed memory index when starting fusion.
{
LockGuard lock(_index_update_lock);
- _current_serial_num = std::max(_current_serial_num, serialNum);
+ set_current_serial_num(std::max(current_serial_num(), serialNum));
}
FusionSpec spec;
@@ -1234,7 +1234,7 @@ IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serial
_selector->setSource(lid, _current_index_id);
_source_list->setSource(lid);
++_source_selector_changes;
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
}
void
@@ -1247,7 +1247,7 @@ IndexMaintainer::removeDocuments(LidVector lids, SerialNum serialNum)
_source_list->setSource(lid);
}
_source_selector_changes += lids.size();
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
_current_index->removeDocuments(std::move(lids));
}
@@ -1267,7 +1267,7 @@ IndexMaintainer::commit(vespalib::Gate& gate)
// only triggered via commit_and_wait()
assert(_ctx.getThreadingService().index().isCurrentThread());
LockGuard lock(_index_update_lock);
- _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), _current_serial_num);
+ _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), current_serial_num());
}
void
@@ -1275,7 +1275,7 @@ IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone)
{
assert(_ctx.getThreadingService().index().isCurrentThread());
LockGuard lock(_index_update_lock);
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
_current_index->commit(onWriteDone, serialNum);
}
@@ -1284,7 +1284,7 @@ IndexMaintainer::heartBeat(SerialNum serialNum)
{
assert(_ctx.getThreadingService().index().isCurrentThread());
LockGuard lock(_index_update_lock);
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
}
void
@@ -1293,7 +1293,7 @@ IndexMaintainer::compactLidSpace(uint32_t lidLimit, SerialNum serialNum)
assert(_ctx.getThreadingService().index().isCurrentThread());
LOG(info, "compactLidSpace(%u, %" PRIu64 ")", lidLimit, serialNum);
LockGuard lock(_index_update_lock);
- _current_serial_num = serialNum;
+ set_current_serial_num(serialNum);
_selector->compactLidSpace(lidLimit);
}
@@ -1313,7 +1313,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum)
{
assert(_ctx.getThreadingService().master().isCurrentThread());
pruneRemovedFields(schema, serialNum);
- IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, _current_serial_num));
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num()));
SetSchemaArgs args;
args._newSchema = schema;
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
index b4bba209937..dfae5b4d643 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -18,6 +18,7 @@
#include <vespa/searchcorespi/flush/flushstats.h>
#include <vespa/searchlib/attribute/fixedsourceselector.h>
#include <vespa/searchlib/common/serialnum.h>
+#include <atomic>
namespace document { class Document; }
@@ -87,14 +88,14 @@ class IndexMaintainer : public IIndexManager,
// _selector is protected by SL + IUL
ISourceSelector::SP _selector;
ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread
- uint32_t _last_fusion_id; // Protected by SL + IUL
- uint32_t _next_id; // Protected by SL + IUL
- uint32_t _current_index_id; // Protected by SL + IUL
- IMemoryIndex::SP _current_index; // Protected by SL + IUL
- bool _flush_empty_current_index;
- SerialNum _current_serial_num;// Protected by IUL
- SerialNum _flush_serial_num; // Protected by SL
- vespalib::system_time _lastFlushTime; // Protected by SL
+ uint32_t _last_fusion_id; // Protected by SL + IUL
+ uint32_t _next_id; // Protected by SL + IUL
+ uint32_t _current_index_id; // Protected by SL + IUL
+ IMemoryIndex::SP _current_index; // Protected by SL + IUL
+ bool _flush_empty_current_index;
+ std::atomic<SerialNum> _current_serial_num;// Protected by IUL
+ SerialNum _flush_serial_num; // Protected by SL
+ vespalib::system_time _lastFlushTime; // Protected by SL
// Extra frozen memory indexes. This list is empty unless new
// memory index has been added by force (due to config change or
// data structure limitations).
@@ -263,6 +264,12 @@ class IndexMaintainer : public IIndexManager,
void commit_and_wait();
void commit(vespalib::Gate& gate);
void pruneRemovedFields(const Schema &schema, SerialNum serialNum);
+ [[nodiscard]] SerialNum current_serial_num() const noexcept {
+ return _current_serial_num.load(std::memory_order_relaxed);
+ }
+ void set_current_serial_num(SerialNum new_serial_num) noexcept {
+ _current_serial_num.store(new_serial_num, std::memory_order_relaxed);
+ }
public:
IndexMaintainer(const IndexMaintainer &) = delete;
@@ -270,7 +277,7 @@ public:
IndexMaintainer(const IndexMaintainerConfig &config,
const IndexMaintainerContext &context,
IIndexMaintainerOperations &operations);
- ~IndexMaintainer();
+ ~IndexMaintainer() override;
/**
* Starts a new MemoryIndex, and dumps the previous one to disk.
@@ -333,7 +340,7 @@ public:
void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) override;
SerialNum getCurrentSerialNum() const override {
- return _current_serial_num;
+ return _current_serial_num.load(std::memory_order_relaxed);
}
SerialNum getFlushedSerialNum() const override {