diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-11-03 22:00:25 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-11-03 22:00:25 +0100 |
commit | 8e996033594317202d92c5f9bf6141570fe7f6f0 (patch) | |
tree | daee37ed45ad8d2fdc348d0b1b3bddf5e380e01c /searchlib | |
parent | 9c3f28fbbdbf5b8be36f994b38f4980ebe7c3e8d (diff) |
Add DocumentInverterCollection.
Diffstat (limited to 'searchlib')
9 files changed, 125 insertions, 65 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index fba488e78aa..415d4d34d0d 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -19,6 +19,8 @@ #include <vespa/vespalib/btree/btreenodeallocator.hpp> #include <vespa/vespalib/btree/btreeroot.hpp> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/gate.h> +#include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <gtest/gtest.h> @@ -75,7 +77,9 @@ namespace { void myPushDocument(DocumentInverter &inv) { - inv.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>()); + vespalib::Gate gate; + inv.pushDocuments(std::make_shared<vespalib::GateCallback>(gate)); + gate.await(); } } @@ -329,9 +333,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire doc = make_doc10(b); inv.invertDocument(10, *doc); - invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync_all(); b.startDocument("id:ns:searchdocument::11"). startIndexField("f3"). @@ -339,9 +341,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(11, *doc); - invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync_all(); b.startDocument("id:ns:searchdocument::12"). startIndexField("f3"). @@ -349,9 +349,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(12, *doc); - invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync_all(); IndexBuilder ib(schema); vespalib::string dump2dir = prefix + "dump2"; @@ -469,9 +467,7 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng DocumentInverter inv(inv_context); inv.invertDocument(10, *make_doc10(b)); - invertThreads->sync_all(); myPushDocument(inv); - pushThreads->sync_all(); IndexBuilder ib(_schema); TuneFileIndexing tuneFileIndexing; diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp index a94e9cf5320..7b52eec78a6 100644 --- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp +++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp @@ -19,6 +19,8 @@ #include <vespa/searchlib/test/memoryindex/wrap_inserter.h> #include <vespa/vespalib/btree/btreenodeallocator.hpp> #include <vespa/vespalib/btree/btreeroot.hpp> +#include <vespa/vespalib/util/gate.h> +#include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/gtest/gtest.h> @@ -412,12 +414,12 @@ public: MyInserter::~MyInserter() = default; void -myremove(uint32_t docId, DocumentInverter &inv, - ISequencedTaskExecutor &invertThreads) +myremove(uint32_t docId, DocumentInverter &inv) { inv.removeDocument(docId); - invertThreads.sync_all(); - inv.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>()); + vespalib::Gate gate; + inv.pushDocuments(std::make_shared<vespalib::GateCallback>(gate)); + gate.await(); } class MyDrainRemoves : IFieldIndexRemoveListener { @@ -443,7 +445,9 @@ public: void myPushDocument(DocumentInverter &inv) { - inv.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>()); + vespalib::Gate gate; + inv.pushDocuments(std::make_shared<vespalib::GateCallback>(gate)); + gate.await(); } const FeatureStore * @@ -953,9 +957,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::20"); _b.startIndexField("f0"). @@ -963,9 +965,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(20, *doc); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::30"); _b.startIndexField("f0"). @@ -994,9 +994,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(30, *doc); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::40"); _b.startIndexField("f0"). @@ -1005,9 +1003,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(40, *doc); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::999"); _b.startIndexField("f0"). @@ -1035,12 +1031,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) doc = _b.endDocument(); for (uint32_t docId = 10000; docId < 20000; ++docId) { _inv.invertDocument(docId, *doc); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); } - _pushThreads->sync_all(); DataStoreBase::MemStats beforeStats = getFeatureStoreMemStats(_fic); LOG(info, "Before feature compaction: allocElems=%zu, usedElems=%zu" @@ -1152,17 +1145,13 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo _b.startIndexField("f1").addStr("a").addStr("c").endField(); Document::UP doc1 = _b.endDocument(); _inv.invertDocument(1, *doc1.get()); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); _b.startDocument("id:ns:searchdocument::2"); _b.startIndexField("f0").addStr("b").addStr("c").endField(); Document::UP doc2 = _b.endDocument(); _inv.invertDocument(2, *doc2.get()); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); EXPECT_TRUE(assertPostingList("[1]", find("a", 0))); EXPECT_TRUE(assertPostingList("[1,2]", find("b", 0))); @@ -1170,8 +1159,7 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo EXPECT_TRUE(assertPostingList("[1]", find("a", 1))); EXPECT_TRUE(assertPostingList("[1]", find("c", 1))); - myremove(1, _inv, *_invertThreads); - _pushThreads->sync_all(); + myremove(1, _inv); EXPECT_TRUE(assertPostingList("[]", find("a", 0))); EXPECT_TRUE(assertPostingList("[2]", find("b", 0))); @@ -1321,11 +1309,8 @@ TEST_F(UriInverterTest, require_that_uri_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); - SimpleMatchData match_data; { uint32_t fieldId = _schema.getIndexFieldId("iu"); @@ -1397,11 +1382,8 @@ TEST_F(CjkInverterTest, require_that_cjk_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads->sync_all(); myPushDocument(_inv); - _pushThreads->sync_all(); - SimpleMatchData match_data; uint32_t fieldId = _schema.getIndexFieldId("f0"); { @@ -1475,8 +1457,7 @@ struct RemoverTest : public FieldIndexCollectionTest { void remove(uint32_t docId) { DocumentInverterContext inv_context(schema, *_invertThreads, *_pushThreads, fic); DocumentInverter inv(inv_context); - myremove(docId, inv, *_invertThreads); - _pushThreads->sync_all(); + myremove(docId, inv); EXPECT_FALSE(fic.getFieldIndex(0u)->getDocumentRemover(). getStore().get(docId).valid()); } diff --git a/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt b/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt index 021e5f9cab8..e845201f5c6 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_library(searchlib_memoryindex OBJECT SOURCES compact_words_store.cpp document_inverter.cpp + document_inverter_collection.cpp document_inverter_context.cpp feature_store.cpp field_index.cpp diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp index f42cfa25877..572df707f28 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp @@ -6,12 +6,14 @@ #include "field_inverter.h" #include "url_field_inverter.h" #include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <vespa/vespalib/util/retain_guard.h> namespace search::memoryindex { using document::Document; using index::Schema; using search::index::FieldLengthCalculator; +using vespalib::RetainGuard; DocumentInverter::DocumentInverter(DocumentInverterContext& context) : _context(context), @@ -113,15 +115,24 @@ void DocumentInverter::pushDocuments(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone) { uint32_t fieldId = 0; + auto retain = std::make_shared<RetainGuard>(_ref_count); + auto& invert_threads = _context.get_invert_threads(); auto& push_threads = _context.get_push_threads(); for (auto &inverter : _inverters) { - push_threads.execute(fieldId,[inverter(inverter.get()), onWriteDone]() { - inverter->applyRemoves(); - inverter->pushDocuments(); - }); + auto invert_id = invert_threads.getExecutorId(fieldId); + auto push_id = push_threads.getExecutorId(fieldId); + invert_threads.execute(invert_id, + [&push_threads, push_id, inverter(inverter.get()), retain, onWriteDone] () mutable + { + push_threads.execute(push_id, + [inverter, retain(std::move(retain)), onWriteDone(std::move(onWriteDone))]() + { + inverter->applyRemoves(); + inverter->pushDocuments(); + }); + }); ++fieldId; } } } - diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h index cce6eda615d..0a128ec6abd 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h @@ -2,6 +2,8 @@ #pragma once +#include <vespa/vespalib/util/monitored_refcount.h> + #include <cstdint> #include <memory> #include <vector> @@ -41,6 +43,7 @@ private: std::vector<std::unique_ptr<FieldInverter>> _inverters; std::vector<std::unique_ptr<UrlFieldInverter>> _urlInverters; + vespalib::MonitoredRefCount _ref_count; public: /** @@ -92,6 +95,8 @@ public: const std::vector<std::unique_ptr<FieldInverter> > & getInverters() const { return _inverters; } uint32_t getNumFields() const { return _inverters.size(); } + void wait_for_zero_ref_count() { _ref_count.waitForZeroRefCount(); } + bool has_zero_ref_count() { return _ref_count.has_zero_ref_count(); } }; } diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.cpp new file mode 100644 index 00000000000..b316b4cc360 --- /dev/null +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.cpp @@ -0,0 +1,45 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "document_inverter_collection.h" +#include "document_inverter.h" +#include <cassert> + +namespace search::memoryindex { + +DocumentInverterCollection::DocumentInverterCollection(DocumentInverterContext& context) + : _context(context), + _free_inverters(), + _inflight_inverters(), + _active_inverter(std::make_unique<DocumentInverter>(_context)), + _num_inverters(1), + _max_inverters(4) +{ +} + +DocumentInverterCollection::~DocumentInverterCollection() = default; + +void +DocumentInverterCollection::switch_active_inverter() +{ + _inflight_inverters.emplace_back(std::move(_active_inverter)); + while (!_inflight_inverters.empty() && _inflight_inverters.front()->has_zero_ref_count()) { + _free_inverters.emplace_back(std::move(_inflight_inverters.front())); + _inflight_inverters.pop_front(); + } + if (!_free_inverters.empty()) { + _active_inverter = std::move(_free_inverters.back()); + _free_inverters.pop_back(); + return; + } + if (_num_inverters >= _max_inverters) { + assert(!_inflight_inverters.empty()); + _active_inverter = std::move(_inflight_inverters.front()); + _inflight_inverters.pop_front(); + _active_inverter->wait_for_zero_ref_count(); + return; + } + _active_inverter = std::make_unique<DocumentInverter>(_context); + ++_num_inverters; +} + +} diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.h b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.h new file mode 100644 index 00000000000..ae766381cf2 --- /dev/null +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.h @@ -0,0 +1,31 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <deque> +#include <memory> +#include <vector> + +namespace search::memoryindex { + +class DocumentInverter; +class DocumentInverterContext; + +/* + * Class containing the document inverters used by a memory index. + */ +class DocumentInverterCollection { + DocumentInverterContext& _context; + std::vector<std::unique_ptr<DocumentInverter>> _free_inverters; + std::deque<std::unique_ptr<DocumentInverter>> _inflight_inverters; + std::unique_ptr<DocumentInverter> _active_inverter; + uint32_t _num_inverters; + uint32_t _max_inverters; +public: + DocumentInverterCollection(DocumentInverterContext& context); + ~DocumentInverterCollection(); + DocumentInverter& get_active_inverter() noexcept { return *_active_inverter; } + void switch_active_inverter(); +}; + +} diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp index 1e59d7ff83b..25b36a3e245 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "document_inverter.h" +#include "document_inverter_collection.h" #include "document_inverter_context.h" #include "field_index_collection.h" #include "memory_index.h" @@ -59,9 +60,7 @@ MemoryIndex::MemoryIndex(const Schema& schema, _pushThreads(pushThreads), _fieldIndexes(std::make_unique<FieldIndexCollection>(_schema, inspector)), _inverter_context(std::make_unique<DocumentInverterContext>(_schema, _invertThreads, _pushThreads, *_fieldIndexes)), - _inverter0(std::make_unique<DocumentInverter>(*_inverter_context)), - _inverter1(std::make_unique<DocumentInverter>(*_inverter_context)), - _inverter(_inverter0.get()), + _inverters(std::make_unique<DocumentInverterCollection>(*_inverter_context)), _frozen(false), _maxDocId(0), // docId 0 is reserved _numDocs(0), @@ -88,7 +87,8 @@ MemoryIndex::insertDocument(uint32_t docId, const document::Document &doc) return; } updateMaxDocId(docId); - _inverter->invertDocument(docId, doc); + auto& inverter = _inverters->get_active_inverter(); + inverter.invertDocument(docId, doc); if (_indexedDocs.insert(docId).second) { incNumDocs(); } @@ -108,22 +108,16 @@ MemoryIndex::removeDocuments(LidVector lids) decNumDocs(); } } - _inverter->removeDocuments(std::move(lids)); + auto& inverter = _inverters->get_active_inverter(); + inverter.removeDocuments(std::move(lids)); } void MemoryIndex::commit(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone) { - _invertThreads.sync_all(); // drain inverting into this inverter - _pushThreads.sync_all(); // drain use of other inverter - _inverter->pushDocuments(onWriteDone); - flipInverter(); -} - -void -MemoryIndex::flipInverter() -{ - _inverter = (_inverter != _inverter0.get()) ? _inverter0.get(): _inverter1.get(); + auto& inverter = _inverters->get_active_inverter(); + inverter.pushDocuments(onWriteDone); + _inverters->switch_active_inverter(); } void diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h index 1ea9f34b48c..760a4ecfb0f 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h +++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h @@ -20,7 +20,7 @@ namespace document { class Document; } namespace search::memoryindex { -class DocumentInverter; +class DocumentInverterCollection; class DocumentInverterContext; class FieldIndexCollection; @@ -48,9 +48,7 @@ private: ISequencedTaskExecutor &_pushThreads; std::unique_ptr<FieldIndexCollection> _fieldIndexes; std::unique_ptr<DocumentInverterContext> _inverter_context; - std::unique_ptr<DocumentInverter> _inverter0; - std::unique_ptr<DocumentInverter> _inverter1; - DocumentInverter *_inverter; + std::unique_ptr<DocumentInverterCollection> _inverters; bool _frozen; uint32_t _maxDocId; uint32_t _numDocs; @@ -79,8 +77,6 @@ private: } } - void flipInverter(); - public: using UP = std::unique_ptr<MemoryIndex>; using SP = std::shared_ptr<MemoryIndex>; |