aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-11-03 22:00:25 +0100
committerTor Egge <Tor.Egge@online.no>2021-11-03 22:00:25 +0100
commit8e996033594317202d92c5f9bf6141570fe7f6f0 (patch)
treedaee37ed45ad8d2fdc348d0b1b3bddf5e380e01c /searchlib/src
parent9c3f28fbbdbf5b8be36f994b38f4980ebe7c3e8d (diff)
Add DocumentInverterCollection.
Diffstat (limited to 'searchlib/src')
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp14
-rw-r--r--searchlib/src/tests/memoryindex/field_index/field_index_test.cpp41
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp21
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter.h5
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.cpp45
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.h31
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp24
-rw-r--r--searchlib/src/vespa/searchlib/memoryindex/memory_index.h8
9 files changed, 125 insertions, 65 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index fba488e78aa..415d4d34d0d 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -19,6 +19,8 @@
#include <vespa/vespalib/btree/btreenodeallocator.hpp>
#include <vespa/vespalib/btree/btreeroot.hpp>
#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/util/gate.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <gtest/gtest.h>
@@ -75,7 +77,9 @@ namespace {
void
myPushDocument(DocumentInverter &inv)
{
- inv.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>());
+ vespalib::Gate gate;
+ inv.pushDocuments(std::make_shared<vespalib::GateCallback>(gate));
+ gate.await();
}
}
@@ -329,9 +333,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
doc = make_doc10(b);
inv.invertDocument(10, *doc);
- invertThreads->sync_all();
myPushDocument(inv);
- pushThreads->sync_all();
b.startDocument("id:ns:searchdocument::11").
startIndexField("f3").
@@ -339,9 +341,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
endField();
doc = b.endDocument();
inv.invertDocument(11, *doc);
- invertThreads->sync_all();
myPushDocument(inv);
- pushThreads->sync_all();
b.startDocument("id:ns:searchdocument::12").
startIndexField("f3").
@@ -349,9 +349,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
endField();
doc = b.endDocument();
inv.invertDocument(12, *doc);
- invertThreads->sync_all();
myPushDocument(inv);
- pushThreads->sync_all();
IndexBuilder ib(schema);
vespalib::string dump2dir = prefix + "dump2";
@@ -469,9 +467,7 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng
DocumentInverter inv(inv_context);
inv.invertDocument(10, *make_doc10(b));
- invertThreads->sync_all();
myPushDocument(inv);
- pushThreads->sync_all();
IndexBuilder ib(_schema);
TuneFileIndexing tuneFileIndexing;
diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
index a94e9cf5320..7b52eec78a6 100644
--- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
@@ -19,6 +19,8 @@
#include <vespa/searchlib/test/memoryindex/wrap_inserter.h>
#include <vespa/vespalib/btree/btreenodeallocator.hpp>
#include <vespa/vespalib/btree/btreeroot.hpp>
+#include <vespa/vespalib/util/gate.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -412,12 +414,12 @@ public:
MyInserter::~MyInserter() = default;
void
-myremove(uint32_t docId, DocumentInverter &inv,
- ISequencedTaskExecutor &invertThreads)
+myremove(uint32_t docId, DocumentInverter &inv)
{
inv.removeDocument(docId);
- invertThreads.sync_all();
- inv.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>());
+ vespalib::Gate gate;
+ inv.pushDocuments(std::make_shared<vespalib::GateCallback>(gate));
+ gate.await();
}
class MyDrainRemoves : IFieldIndexRemoveListener {
@@ -443,7 +445,9 @@ public:
void
myPushDocument(DocumentInverter &inv)
{
- inv.pushDocuments(std::shared_ptr<vespalib::IDestructorCallback>());
+ vespalib::Gate gate;
+ inv.pushDocuments(std::make_shared<vespalib::GateCallback>(gate));
+ gate.await();
}
const FeatureStore *
@@ -953,9 +957,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
_b.startDocument("id:ns:searchdocument::20");
_b.startIndexField("f0").
@@ -963,9 +965,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(20, *doc);
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
_b.startDocument("id:ns:searchdocument::30");
_b.startIndexField("f0").
@@ -994,9 +994,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(30, *doc);
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
_b.startDocument("id:ns:searchdocument::40");
_b.startIndexField("f0").
@@ -1005,9 +1003,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(40, *doc);
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
_b.startDocument("id:ns:searchdocument::999");
_b.startIndexField("f0").
@@ -1035,12 +1031,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working)
doc = _b.endDocument();
for (uint32_t docId = 10000; docId < 20000; ++docId) {
_inv.invertDocument(docId, *doc);
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
}
- _pushThreads->sync_all();
DataStoreBase::MemStats beforeStats = getFeatureStoreMemStats(_fic);
LOG(info,
"Before feature compaction: allocElems=%zu, usedElems=%zu"
@@ -1152,17 +1145,13 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo
_b.startIndexField("f1").addStr("a").addStr("c").endField();
Document::UP doc1 = _b.endDocument();
_inv.invertDocument(1, *doc1.get());
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
_b.startDocument("id:ns:searchdocument::2");
_b.startIndexField("f0").addStr("b").addStr("c").endField();
Document::UP doc2 = _b.endDocument();
_inv.invertDocument(2, *doc2.get());
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
EXPECT_TRUE(assertPostingList("[1]", find("a", 0)));
EXPECT_TRUE(assertPostingList("[1,2]", find("b", 0)));
@@ -1170,8 +1159,7 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo
EXPECT_TRUE(assertPostingList("[1]", find("a", 1)));
EXPECT_TRUE(assertPostingList("[1]", find("c", 1)));
- myremove(1, _inv, *_invertThreads);
- _pushThreads->sync_all();
+ myremove(1, _inv);
EXPECT_TRUE(assertPostingList("[]", find("a", 0)));
EXPECT_TRUE(assertPostingList("[2]", find("b", 0)));
@@ -1321,11 +1309,8 @@ TEST_F(UriInverterTest, require_that_uri_indexing_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
-
SimpleMatchData match_data;
{
uint32_t fieldId = _schema.getIndexFieldId("iu");
@@ -1397,11 +1382,8 @@ TEST_F(CjkInverterTest, require_that_cjk_indexing_is_working)
endField();
doc = _b.endDocument();
_inv.invertDocument(10, *doc);
- _invertThreads->sync_all();
myPushDocument(_inv);
- _pushThreads->sync_all();
-
SimpleMatchData match_data;
uint32_t fieldId = _schema.getIndexFieldId("f0");
{
@@ -1475,8 +1457,7 @@ struct RemoverTest : public FieldIndexCollectionTest {
void remove(uint32_t docId) {
DocumentInverterContext inv_context(schema, *_invertThreads, *_pushThreads, fic);
DocumentInverter inv(inv_context);
- myremove(docId, inv, *_invertThreads);
- _pushThreads->sync_all();
+ myremove(docId, inv);
EXPECT_FALSE(fic.getFieldIndex(0u)->getDocumentRemover().
getStore().get(docId).valid());
}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt b/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt
index 021e5f9cab8..e845201f5c6 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/memoryindex/CMakeLists.txt
@@ -3,6 +3,7 @@ vespa_add_library(searchlib_memoryindex OBJECT
SOURCES
compact_words_store.cpp
document_inverter.cpp
+ document_inverter_collection.cpp
document_inverter_context.cpp
feature_store.cpp
field_index.cpp
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
index f42cfa25877..572df707f28 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp
@@ -6,12 +6,14 @@
#include "field_inverter.h"
#include "url_field_inverter.h"
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <vespa/vespalib/util/retain_guard.h>
namespace search::memoryindex {
using document::Document;
using index::Schema;
using search::index::FieldLengthCalculator;
+using vespalib::RetainGuard;
DocumentInverter::DocumentInverter(DocumentInverterContext& context)
: _context(context),
@@ -113,15 +115,24 @@ void
DocumentInverter::pushDocuments(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone)
{
uint32_t fieldId = 0;
+ auto retain = std::make_shared<RetainGuard>(_ref_count);
+ auto& invert_threads = _context.get_invert_threads();
auto& push_threads = _context.get_push_threads();
for (auto &inverter : _inverters) {
- push_threads.execute(fieldId,[inverter(inverter.get()), onWriteDone]() {
- inverter->applyRemoves();
- inverter->pushDocuments();
- });
+ auto invert_id = invert_threads.getExecutorId(fieldId);
+ auto push_id = push_threads.getExecutorId(fieldId);
+ invert_threads.execute(invert_id,
+ [&push_threads, push_id, inverter(inverter.get()), retain, onWriteDone] () mutable
+ {
+ push_threads.execute(push_id,
+ [inverter, retain(std::move(retain)), onWriteDone(std::move(onWriteDone))]()
+ {
+ inverter->applyRemoves();
+ inverter->pushDocuments();
+ });
+ });
++fieldId;
}
}
}
-
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h
index cce6eda615d..0a128ec6abd 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h
@@ -2,6 +2,8 @@
#pragma once
+#include <vespa/vespalib/util/monitored_refcount.h>
+
#include <cstdint>
#include <memory>
#include <vector>
@@ -41,6 +43,7 @@ private:
std::vector<std::unique_ptr<FieldInverter>> _inverters;
std::vector<std::unique_ptr<UrlFieldInverter>> _urlInverters;
+ vespalib::MonitoredRefCount _ref_count;
public:
/**
@@ -92,6 +95,8 @@ public:
const std::vector<std::unique_ptr<FieldInverter> > & getInverters() const { return _inverters; }
uint32_t getNumFields() const { return _inverters.size(); }
+ void wait_for_zero_ref_count() { _ref_count.waitForZeroRefCount(); }
+ bool has_zero_ref_count() { return _ref_count.has_zero_ref_count(); }
};
}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.cpp
new file mode 100644
index 00000000000..b316b4cc360
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.cpp
@@ -0,0 +1,45 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "document_inverter_collection.h"
+#include "document_inverter.h"
+#include <cassert>
+
+namespace search::memoryindex {
+
+DocumentInverterCollection::DocumentInverterCollection(DocumentInverterContext& context)
+ : _context(context),
+ _free_inverters(),
+ _inflight_inverters(),
+ _active_inverter(std::make_unique<DocumentInverter>(_context)),
+ _num_inverters(1),
+ _max_inverters(4)
+{
+}
+
+DocumentInverterCollection::~DocumentInverterCollection() = default;
+
+void
+DocumentInverterCollection::switch_active_inverter()
+{
+ _inflight_inverters.emplace_back(std::move(_active_inverter));
+ while (!_inflight_inverters.empty() && _inflight_inverters.front()->has_zero_ref_count()) {
+ _free_inverters.emplace_back(std::move(_inflight_inverters.front()));
+ _inflight_inverters.pop_front();
+ }
+ if (!_free_inverters.empty()) {
+ _active_inverter = std::move(_free_inverters.back());
+ _free_inverters.pop_back();
+ return;
+ }
+ if (_num_inverters >= _max_inverters) {
+ assert(!_inflight_inverters.empty());
+ _active_inverter = std::move(_inflight_inverters.front());
+ _inflight_inverters.pop_front();
+ _active_inverter->wait_for_zero_ref_count();
+ return;
+ }
+ _active_inverter = std::make_unique<DocumentInverter>(_context);
+ ++_num_inverters;
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.h b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.h
new file mode 100644
index 00000000000..ae766381cf2
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter_collection.h
@@ -0,0 +1,31 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <deque>
+#include <memory>
+#include <vector>
+
+namespace search::memoryindex {
+
+class DocumentInverter;
+class DocumentInverterContext;
+
+/*
+ * Class containing the document inverters used by a memory index.
+ */
+class DocumentInverterCollection {
+ DocumentInverterContext& _context;
+ std::vector<std::unique_ptr<DocumentInverter>> _free_inverters;
+ std::deque<std::unique_ptr<DocumentInverter>> _inflight_inverters;
+ std::unique_ptr<DocumentInverter> _active_inverter;
+ uint32_t _num_inverters;
+ uint32_t _max_inverters;
+public:
+ DocumentInverterCollection(DocumentInverterContext& context);
+ ~DocumentInverterCollection();
+ DocumentInverter& get_active_inverter() noexcept { return *_active_inverter; }
+ void switch_active_inverter();
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp
index 1e59d7ff83b..25b36a3e245 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp
+++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "document_inverter.h"
+#include "document_inverter_collection.h"
#include "document_inverter_context.h"
#include "field_index_collection.h"
#include "memory_index.h"
@@ -59,9 +60,7 @@ MemoryIndex::MemoryIndex(const Schema& schema,
_pushThreads(pushThreads),
_fieldIndexes(std::make_unique<FieldIndexCollection>(_schema, inspector)),
_inverter_context(std::make_unique<DocumentInverterContext>(_schema, _invertThreads, _pushThreads, *_fieldIndexes)),
- _inverter0(std::make_unique<DocumentInverter>(*_inverter_context)),
- _inverter1(std::make_unique<DocumentInverter>(*_inverter_context)),
- _inverter(_inverter0.get()),
+ _inverters(std::make_unique<DocumentInverterCollection>(*_inverter_context)),
_frozen(false),
_maxDocId(0), // docId 0 is reserved
_numDocs(0),
@@ -88,7 +87,8 @@ MemoryIndex::insertDocument(uint32_t docId, const document::Document &doc)
return;
}
updateMaxDocId(docId);
- _inverter->invertDocument(docId, doc);
+ auto& inverter = _inverters->get_active_inverter();
+ inverter.invertDocument(docId, doc);
if (_indexedDocs.insert(docId).second) {
incNumDocs();
}
@@ -108,22 +108,16 @@ MemoryIndex::removeDocuments(LidVector lids)
decNumDocs();
}
}
- _inverter->removeDocuments(std::move(lids));
+ auto& inverter = _inverters->get_active_inverter();
+ inverter.removeDocuments(std::move(lids));
}
void
MemoryIndex::commit(const std::shared_ptr<vespalib::IDestructorCallback> &onWriteDone)
{
- _invertThreads.sync_all(); // drain inverting into this inverter
- _pushThreads.sync_all(); // drain use of other inverter
- _inverter->pushDocuments(onWriteDone);
- flipInverter();
-}
-
-void
-MemoryIndex::flipInverter()
-{
- _inverter = (_inverter != _inverter0.get()) ? _inverter0.get(): _inverter1.get();
+ auto& inverter = _inverters->get_active_inverter();
+ inverter.pushDocuments(onWriteDone);
+ _inverters->switch_active_inverter();
}
void
diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h
index 1ea9f34b48c..760a4ecfb0f 100644
--- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h
+++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h
@@ -20,7 +20,7 @@ namespace document { class Document; }
namespace search::memoryindex {
-class DocumentInverter;
+class DocumentInverterCollection;
class DocumentInverterContext;
class FieldIndexCollection;
@@ -48,9 +48,7 @@ private:
ISequencedTaskExecutor &_pushThreads;
std::unique_ptr<FieldIndexCollection> _fieldIndexes;
std::unique_ptr<DocumentInverterContext> _inverter_context;
- std::unique_ptr<DocumentInverter> _inverter0;
- std::unique_ptr<DocumentInverter> _inverter1;
- DocumentInverter *_inverter;
+ std::unique_ptr<DocumentInverterCollection> _inverters;
bool _frozen;
uint32_t _maxDocId;
uint32_t _numDocs;
@@ -79,8 +77,6 @@ private:
}
}
- void flipInverter();
-
public:
using UP = std::unique_ptr<MemoryIndex>;
using SP = std::shared_ptr<MemoryIndex>;