diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-03-09 15:18:04 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-03-09 15:18:04 +0000 |
commit | 9d05cfd466959c68b9b633b876016678c253fed6 (patch) | |
tree | 2cc8155f2c83cd76024cbd92b6c421b53af9abe0 | |
parent | 7743992d1dd55b635a297ad97546d4fbfd5ce23f (diff) |
Make current serial number in IndexMaintainer atomic
-rw-r--r-- | searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp | 30 | ||||
-rw-r--r-- | searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h | 27 |
2 files changed, 32 insertions, 25 deletions
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index 20aba93d4d7..2e53db3bd87 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -490,7 +490,7 @@ IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index) args->old_absolute_id = _current_index_id + _last_fusion_id; args->old_source_list = _source_list; string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id)); - args->flush_serial_num = _current_serial_num; + args->flush_serial_num = current_serial_num(); { LockGuard lock(_index_update_lock); // Handover of extra memory indexes to flush @@ -795,7 +795,7 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) args._oldSourceList = _source_list; // Delay destruction uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id; string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId)); - SerialNum freezeSerialNum = _current_serial_num; + SerialNum freezeSerialNum = current_serial_num(); bool dropEmptyLast = false; SaveInfo::UP saveInfo; @@ -921,7 +921,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, _flush_serial_num = IndexReadUtilities::readSerialNum(latest_index_dir); _lastFlushTime = search::FileKit::getModificationTime(latest_index_dir); - _current_serial_num = _flush_serial_num; + set_current_serial_num(_flush_serial_num); const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir); _selector = FixedSourceSelector::load(selector, _next_id - 1); } else { @@ -941,7 +941,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); _selector->setDefaultSource(_current_index_id); auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector)); - _current_index = operations.createMemoryIndex(_schema, *sourceList, _current_serial_num); + _current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num()); LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num); sourceList->append(_current_index_id, _current_index); sourceList->setCurrentIndex(_current_index_id); @@ -966,10 +966,10 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits { LockGuard lock(_index_update_lock); - _current_serial_num = std::max(_current_serial_num, serialNum); + set_current_serial_num(std::max(current_serial_num(), serialNum)); } - IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, _current_serial_num)); + IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num())); FlushArgs args; args.stats = stats; // Ensure that all index thread tasks accessing memory index have completed. @@ -984,7 +984,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat if (args._skippedEmptyLast && args._extraIndexes.empty()) { // No memory index to flush, it was empty LockGuard lock(_state_lock); - _flush_serial_num = _current_serial_num; + _flush_serial_num = current_serial_num(); _lastFlushTime = vespalib::system_clock::now(); LOG(debug, "No memory index to flush. Update serial number and flush time to current: " "flushSerialNum(%" PRIu64 "), lastFlushTime(%f)", @@ -1014,7 +1014,7 @@ IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushTok // XXX: Claims to have flushed memory index when starting fusion. { LockGuard lock(_index_update_lock); - _current_serial_num = std::max(_current_serial_num, serialNum); + set_current_serial_num(std::max(current_serial_num(), serialNum)); } FusionSpec spec; @@ -1234,7 +1234,7 @@ IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serial _selector->setSource(lid, _current_index_id); _source_list->setSource(lid); ++_source_selector_changes; - _current_serial_num = serialNum; + set_current_serial_num(serialNum); } void @@ -1247,7 +1247,7 @@ IndexMaintainer::removeDocuments(LidVector lids, SerialNum serialNum) _source_list->setSource(lid); } _source_selector_changes += lids.size(); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); _current_index->removeDocuments(std::move(lids)); } @@ -1267,7 +1267,7 @@ IndexMaintainer::commit(vespalib::Gate& gate) // only triggered via commit_and_wait() assert(_ctx.getThreadingService().index().isCurrentThread()); LockGuard lock(_index_update_lock); - _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), _current_serial_num); + _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), current_serial_num()); } void @@ -1275,7 +1275,7 @@ IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone) { assert(_ctx.getThreadingService().index().isCurrentThread()); LockGuard lock(_index_update_lock); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); _current_index->commit(onWriteDone, serialNum); } @@ -1284,7 +1284,7 @@ IndexMaintainer::heartBeat(SerialNum serialNum) { assert(_ctx.getThreadingService().index().isCurrentThread()); LockGuard lock(_index_update_lock); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); } void @@ -1293,7 +1293,7 @@ IndexMaintainer::compactLidSpace(uint32_t lidLimit, SerialNum serialNum) assert(_ctx.getThreadingService().index().isCurrentThread()); LOG(info, "compactLidSpace(%u, %" PRIu64 ")", lidLimit, serialNum); LockGuard lock(_index_update_lock); - _current_serial_num = serialNum; + set_current_serial_num(serialNum); _selector->compactLidSpace(lidLimit); } @@ -1313,7 +1313,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) { assert(_ctx.getThreadingService().master().isCurrentThread()); pruneRemovedFields(schema, serialNum); - IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, _current_serial_num)); + IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num())); SetSchemaArgs args; args._newSchema = schema; diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index b4bba209937..dfae5b4d643 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -18,6 +18,7 @@ #include <vespa/searchcorespi/flush/flushstats.h> #include <vespa/searchlib/attribute/fixedsourceselector.h> #include <vespa/searchlib/common/serialnum.h> +#include <atomic> namespace document { class Document; } @@ -87,14 +88,14 @@ class IndexMaintainer : public IIndexManager, // _selector is protected by SL + IUL ISourceSelector::SP _selector; ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread - uint32_t _last_fusion_id; // Protected by SL + IUL - uint32_t _next_id; // Protected by SL + IUL - uint32_t _current_index_id; // Protected by SL + IUL - IMemoryIndex::SP _current_index; // Protected by SL + IUL - bool _flush_empty_current_index; - SerialNum _current_serial_num;// Protected by IUL - SerialNum _flush_serial_num; // Protected by SL - vespalib::system_time _lastFlushTime; // Protected by SL + uint32_t _last_fusion_id; // Protected by SL + IUL + uint32_t _next_id; // Protected by SL + IUL + uint32_t _current_index_id; // Protected by SL + IUL + IMemoryIndex::SP _current_index; // Protected by SL + IUL + bool _flush_empty_current_index; + std::atomic<SerialNum> _current_serial_num;// Protected by IUL + SerialNum _flush_serial_num; // Protected by SL + vespalib::system_time _lastFlushTime; // Protected by SL // Extra frozen memory indexes. This list is empty unless new // memory index has been added by force (due to config change or // data structure limitations). @@ -263,6 +264,12 @@ class IndexMaintainer : public IIndexManager, void commit_and_wait(); void commit(vespalib::Gate& gate); void pruneRemovedFields(const Schema &schema, SerialNum serialNum); + [[nodiscard]] SerialNum current_serial_num() const noexcept { + return _current_serial_num.load(std::memory_order_relaxed); + } + void set_current_serial_num(SerialNum new_serial_num) noexcept { + _current_serial_num.store(new_serial_num, std::memory_order_relaxed); + } public: IndexMaintainer(const IndexMaintainer &) = delete; @@ -270,7 +277,7 @@ public: IndexMaintainer(const IndexMaintainerConfig &config, const IndexMaintainerContext &context, IIndexMaintainerOperations &operations); - ~IndexMaintainer(); + ~IndexMaintainer() override; /** * Starts a new MemoryIndex, and dumps the previous one to disk. @@ -333,7 +340,7 @@ public: void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) override; SerialNum getCurrentSerialNum() const override { - return _current_serial_num; + return _current_serial_num.load(std::memory_order_relaxed); } SerialNum getFlushedSerialNum() const override { |