diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-11-11 14:15:08 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-11-11 14:15:08 +0100 |
commit | fb59b97efa7189d09d021cec2c7e287328553405 (patch) | |
tree | 9a53e928620e6f0b9785d603c61f2c8dd8ab54d8 | |
parent | 5f896b2f66b395172c29fe7227d0f8c5efd8cb9d (diff) |
Remove full sync of threading service in IndexMaintainer.
5 files changed, 17 insertions, 23 deletions
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index 839618afb4d..19469d59d1b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -16,6 +16,7 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/array.hpp> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/time.h> @@ -940,9 +941,8 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, _current_serial_num)); FlushArgs args; args.stats = stats; - scheduleCommit(); // Ensure that all index thread tasks accessing memory index have completed. - _ctx.getThreadingService().sync(); + commit_and_wait(); // Call reconfig closure for this change auto configure = makeLambdaConfigure([this, argsP=&args, indexP=&new_index]() { return doneInitFlush(argsP, indexP); @@ -1197,20 +1197,22 @@ IndexMaintainer::removeDocuments(LidVector lids, SerialNum serialNum) } void -IndexMaintainer::scheduleCommit() +IndexMaintainer::commit_and_wait() { assert(_ctx.getThreadingService().master().isCurrentThread()); - _ctx.getThreadingService().index().execute(makeLambdaTask([this]() { commit(); })); + vespalib::Gate gate; + _ctx.getThreadingService().index().execute(makeLambdaTask([this, &gate]() { commit(gate); })); + // Ensure that all index thread tasks accessing memory index have completed. + gate.await(); } void -IndexMaintainer::commit() +IndexMaintainer::commit(vespalib::Gate& gate) { - // only triggered via scheduleCommit() + // only triggered via commit_and_wait() assert(_ctx.getThreadingService().index().isCurrentThread()); LockGuard lock(_index_update_lock); - _current_index->commit({}, _current_serial_num); - // caller calls _ctx.getThreadingService().sync() + _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), _current_serial_num); } void @@ -1260,9 +1262,8 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) SetSchemaArgs args; args._newSchema = schema; - scheduleCommit(); // Ensure that all index thread tasks accessing memory index have completed. - _ctx.getThreadingService().sync(); + commit_and_wait(); // Everything should be quiet now. doneSetSchema(args, new_index); // Source collection has now changed, caller must reconfigure further @@ -1287,8 +1288,7 @@ IndexMaintainer::pruneRemovedFields(const Schema &schema, SerialNum serialNum) new_source_list = std::make_shared<IndexCollection>(_selector, *_source_list); } if (reopenDiskIndexes(*new_source_list)) { - scheduleCommit(); - _ctx.getThreadingService().sync(); + commit_and_wait(); // Everything should be quiet now. LockGuard state_lock(_state_lock); LockGuard lock(_new_search_lock); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index 55f05410108..6e4eb32ee50 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -22,6 +22,7 @@ namespace document { class Document; } namespace search::common { class FileHeaderContext; } +namespace vespalib { class Gate; } namespace searchcorespi::index { @@ -258,8 +259,8 @@ class IndexMaintainer : public IIndexManager, bool reconfigure(std::unique_ptr<Configure> configure); void warmupDone(ISearchableIndexCollection::SP current) override; bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive); - void scheduleCommit(); - void commit(); + void commit_and_wait(); + void commit(vespalib::Gate& gate); void pruneRemovedFields(const Schema &schema, SerialNum serialNum); public: diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp index c0623aec9df..fdb2de8fb59 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp @@ -54,8 +54,7 @@ DocumentInverter::DocumentInverter(DocumentInverterContext& context) DocumentInverter::~DocumentInverter() { - _context.get_invert_threads().sync_all(); - _context.get_push_threads().sync_all(); + wait_for_zero_ref_count(); } void diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp index a9d935aee3b..330320d5047 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp @@ -72,11 +72,7 @@ MemoryIndex::MemoryIndex(const Schema& schema, { } -MemoryIndex::~MemoryIndex() -{ - _invertThreads.sync_all(); - _pushThreads.sync_all(); -} +MemoryIndex::~MemoryIndex() = default; void MemoryIndex::insertDocument(uint32_t docId, const document::Document &doc, OnWriteDoneType on_write_done) diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h index 6953aee7875..dc1b5d8060d 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h +++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h @@ -121,8 +121,6 @@ public: * Commits the inserts and removes since the last commit, making them searchable. * * When commit is completed, 'on_write_done' goes out of scope, scheduling completion callback. - * - * Callers can call pushThreads.sync() to wait for push completion. */ void commit(OnWriteDoneType on_write_done); |