aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-01-05 17:30:39 +0100
committerTor Egge <Tor.Egge@online.no>2022-01-05 17:30:39 +0100
commiteed19c002fecdb424b5452ae7849755e6e132efb (patch)
treef9e7f857e27bf77dfc11b95c1f4414ab636e7392 /searchlib
parentfcce4873d66e5e5140fa470a22cbb3e752159ea2 (diff)
Factor out FieldMerger from Fusion.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp1
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt3
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.cpp417
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h54
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.cpp412
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion.h74
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp34
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h34
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp21
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h43
10 files changed, 624 insertions, 469 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index 1c86981372d..6794b9c0f5c 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -9,6 +9,7 @@
#include <vespa/searchlib/fef/termfieldmatchdata.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
+#include <vespa/searchlib/index/schemautil.h>
#include <vespa/searchlib/memoryindex/document_inverter.h>
#include <vespa/searchlib/memoryindex/document_inverter_context.h>
#include <vespa/searchlib/memoryindex/field_index_collection.h>
diff --git a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt
index 261874fb4c8..74a873a4e29 100644
--- a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt
@@ -10,11 +10,14 @@ vespa_add_library(searchlib_diskindex OBJECT
disktermblueprint.cpp
docidmapper.cpp
extposocc.cpp
+ field_merger.cpp
fieldreader.cpp
fieldwriter.cpp
field_length_scanner.cpp
fileheader.cpp
fusion.cpp
+ fusion_input_index.cpp
+ fusion_output_index.cpp
indexbuilder.cpp
pagedict4file.cpp
pagedict4randread.cpp
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
new file mode 100644
index 00000000000..375d30d3003
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp
@@ -0,0 +1,417 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "field_merger.h"
+#include "fieldreader.h"
+#include "field_length_scanner.h"
+#include "fusion_input_index.h"
+#include "fusion_output_index.h"
+#include "dictionarywordreader.h"
+#include "wordnummapper.h"
+#include <vespa/fastos/file.h>
+#include <vespa/searchlib/bitcompression/posocc_fields_params.h>
+#include <vespa/searchlib/common/i_flush_token.h>
+#include <vespa/searchlib/index/schemautil.h>
+#include <vespa/searchlib/util/filekit.h>
+#include <vespa/searchlib/util/dirtraverse.h>
+#include <vespa/searchlib/util/postingpriorityqueue.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/util/exceptions.h>
+
+#include <vespa/log/log.h>
+
+LOG_SETUP(".diskindex.field_merger");
+
+using search::FileKit;
+using search::bitcompression::PosOccFieldParams;
+using search::bitcompression::PosOccFieldsParams;
+using search::common::FileHeaderContext;
+using search::index::FieldLengthInfo;
+using search::index::PostingListParams;
+using search::index::Schema;
+using search::index::SchemaUtil;
+using search::index::schema::DataType;
+using vespalib::IllegalArgumentException;
+using vespalib::make_string;
+
+namespace search::diskindex {
+
+
+namespace {
+
+vespalib::string
+createTmpPath(const vespalib::string & base, uint32_t index) {
+ vespalib::asciistream os;
+ os << base;
+ os << "/tmpindex";
+ os << index;
+ return os.str();
+}
+
+}
+
+FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index)
+ : _id(id),
+ _field_dir(fusion_out_index.get_path() + "/" + SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()),
+ _fusion_out_index(fusion_out_index)
+{
+}
+
+FieldMerger::~FieldMerger() = default;
+
+void
+FieldMerger::make_tmp_dirs()
+{
+ for (const auto & index : _fusion_out_index.get_old_indexes()) {
+ vespalib::mkdir(createTmpPath(_field_dir, index.getIndex()), false);
+ }
+}
+
+bool
+FieldMerger::clean_tmp_dirs()
+{
+ uint32_t i = 0;
+ for (;;) {
+ vespalib::string tmpindexpath = createTmpPath(_field_dir, i);
+ FastOS_StatInfo statInfo;
+ if (!FastOS_File::Stat(tmpindexpath.c_str(), &statInfo)) {
+ if (statInfo._error == FastOS_StatInfo::FileNotFound) {
+ break;
+ }
+ LOG(error, "Failed to stat tmpdir %s", tmpindexpath.c_str());
+ return false;
+ }
+ i++;
+ }
+ while (i > 0) {
+ i--;
+ vespalib::string tmpindexpath = createTmpPath(_field_dir, i);
+ search::DirectoryTraverse dt(tmpindexpath.c_str());
+ if (!dt.RemoveTree()) {
+ LOG(error, "Failed to clean tmpdir %s", tmpindexpath.c_str());
+ return false;
+ }
+ }
+ return true;
+}
+
+bool
+FieldMerger::open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordReader>> & readers, PostingPriorityQueue<DictionaryWordReader>& heap)
+{
+ SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
+ for (auto & oi : _fusion_out_index.get_old_indexes()) {
+ auto reader(std::make_unique<DictionaryWordReader>());
+ const vespalib::string &tmpindexpath = createTmpPath(_field_dir, oi.getIndex());
+ const vespalib::string &oldindexpath = oi.getPath();
+ vespalib::string wordMapName = tmpindexpath + "/old2new.dat";
+ vespalib::string fieldDir(oldindexpath + "/" + index.getName());
+ vespalib::string dictName(fieldDir + "/dictionary");
+ const Schema &oldSchema = oi.getSchema();
+ if (!index.hasOldFields(oldSchema)) {
+ continue; // drop data
+ }
+ bool res = reader->open(dictName, wordMapName, _fusion_out_index.get_tune_file_indexing()._read);
+ if (!res) {
+ LOG(error, "Could not open dictionary %s to generate %s", dictName.c_str(), wordMapName.c_str());
+ return false;
+ }
+ reader->read();
+ if (reader->isValid()) {
+ readers.push_back(std::move(reader));
+ heap.initialAdd(readers.back().get());
+ }
+ }
+ return true;
+}
+
+bool
+FieldMerger::read_mapping_files(WordNumMappingList& list)
+{
+ SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
+ for (const auto & oi : _fusion_out_index.get_old_indexes()) {
+ std::vector<uint32_t> oldIndexes;
+ const Schema &oldSchema = oi.getSchema();
+ if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) {
+ return false;
+ }
+ WordNumMapping &wordNumMapping = list[oi.getIndex()];
+ if (oldIndexes.empty()) {
+ wordNumMapping.noMappingFile();
+ continue;
+ }
+ if (!index.hasOldFields(oldSchema)) {
+ continue; // drop data
+ }
+
+ // Open word mapping file
+ vespalib::string old2newname = createTmpPath(_field_dir, oi.getIndex()) + "/old2new.dat";
+ wordNumMapping.readMappingFile(old2newname, _fusion_out_index.get_tune_file_indexing()._read);
+ }
+
+ return true;
+}
+
+bool
+FieldMerger::renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token)
+{
+ SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
+ vespalib::string indexName = index.getName();
+ LOG(debug, "Renumber word IDs for field %s", indexName.c_str());
+
+ std::vector<std::unique_ptr<DictionaryWordReader>> readers;
+ PostingPriorityQueue<DictionaryWordReader> heap;
+ WordAggregator out;
+
+ if (!open_input_word_readers(readers, heap)) {
+ return false;
+ }
+ heap.merge(out, 4, flush_token);
+ if (flush_token.stop_requested()) {
+ return false;
+ }
+ assert(heap.empty());
+ numWordIds = out.getWordNum();
+
+ // Close files
+ for (auto &i : readers) {
+ i->close();
+ }
+
+ // Now read mapping files back into an array
+ // XXX: avoid this, and instead make the array here
+ if (!read_mapping_files(list)) {
+ return false;
+ }
+ LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str());
+
+ return true;
+}
+
+std::shared_ptr<FieldLengthScanner>
+FieldMerger::allocate_field_length_scanner()
+{
+ SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
+ if (index.use_interleaved_features()) {
+ PosOccFieldsParams fieldsParams;
+ fieldsParams.setSchemaParams(index.getSchema(), index.getIndex());
+ assert(fieldsParams.getNumFields() > 0);
+ const PosOccFieldParams &fieldParams = fieldsParams.getFieldParams()[0];
+ if (fieldParams._hasElements) {
+ for (const auto &old_index : _fusion_out_index.get_old_indexes()) {
+ const Schema &old_schema = old_index.getSchema();
+ if (index.hasOldFields(old_schema) &&
+ !index.has_matching_use_interleaved_features(old_schema)) {
+ return std::make_shared<FieldLengthScanner>(_fusion_out_index.get_doc_id_limit());
+ }
+ }
+ }
+ }
+ return std::shared_ptr<FieldLengthScanner>();
+}
+
+bool
+FieldMerger::open_input_field_readers(const WordNumMappingList& list, std::vector<std::unique_ptr<FieldReader>>& readers)
+{
+ SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
+ auto field_length_scanner = allocate_field_length_scanner();
+ vespalib::string indexName = index.getName();
+ for (const auto &oi : _fusion_out_index.get_old_indexes()) {
+ const Schema &oldSchema = oi.getSchema();
+ if (!index.hasOldFields(oldSchema)) {
+ continue; // drop data
+ }
+ auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner);
+ reader->setup(list[oi.getIndex()], oi.getDocIdMapping());
+ if (!reader->open(oi.getPath() + "/" + indexName + "/", _fusion_out_index.get_tune_file_indexing()._read)) {
+ return false;
+ }
+ readers.push_back(std::move(reader));
+ }
+ return true;
+}
+
+bool
+FieldMerger::open_field_writer(FieldWriter& writer, const FieldLengthInfo& field_length_info)
+{
+ SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
+ if (!writer.open(_field_dir + "/", 64, 262144, _fusion_out_index.get_dynamic_k_pos_index_format(),
+ index.use_interleaved_features(), index.getSchema(),
+ index.getIndex(),
+ field_length_info,
+ _fusion_out_index.get_tune_file_indexing()._write, _fusion_out_index.get_file_header_context())) {
+ throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", _field_dir.c_str()));
+ }
+ return true;
+}
+
+bool
+FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer)
+{
+ bool rawFormatOK = true;
+ bool cookedFormatOK = true;
+ PostingListParams featureParams;
+ PostingListParams outFeatureParams;
+ vespalib::string cookedFormat;
+ vespalib::string rawFormat;
+
+ if (!reader.isValid()) {
+ return true;
+ }
+ {
+ writer.getFeatureParams(featureParams);
+ cookedFormat = featureParams.getStr("cookedEncoding");
+ rawFormat = featureParams.getStr("encoding");
+ if (rawFormat == "") {
+ rawFormatOK = false; // Typically uncompressed file
+ }
+ outFeatureParams = featureParams;
+ }
+ {
+ reader.getFeatureParams(featureParams);
+ if (cookedFormat != featureParams.getStr("cookedEncoding")) {
+ cookedFormatOK = false;
+ }
+ if (rawFormat != featureParams.getStr("encoding")) {
+ rawFormatOK = false;
+ }
+ if (featureParams != outFeatureParams) {
+ rawFormatOK = false;
+ }
+ if (!reader.allowRawFeatures()) {
+ rawFormatOK = false; // Reader transforms data
+ }
+ }
+ if (!cookedFormatOK) {
+ LOG(error, "Cannot perform fusion, cooked feature formats don't match");
+ return false;
+ }
+ if (rawFormatOK) {
+ featureParams.clear();
+ featureParams.set("cooked", false);
+ reader.setFeatureParams(featureParams);
+ reader.getFeatureParams(featureParams);
+ if (featureParams.isSet("cookedEncoding") ||
+ rawFormat != featureParams.getStr("encoding")) {
+ rawFormatOK = false;
+ }
+ if (!rawFormatOK) {
+ LOG(error, "Cannot perform fusion, raw format setting failed");
+ return false;
+ }
+ LOG(debug, "Using raw feature format for fusion of posting files");
+ }
+ return true;
+}
+
+bool
+FieldMerger::setup_merge_heap(const std::vector<std::unique_ptr<FieldReader>>& readers, FieldWriter& writer, PostingPriorityQueue<FieldReader>& heap)
+{
+ for (auto &reader : readers) {
+ if (!select_cooked_or_raw_features(*reader, writer)) {
+ return false;
+ }
+ if (reader->isValid()) {
+ reader->read();
+ }
+ if (reader->isValid()) {
+ heap.initialAdd(reader.get());
+ }
+ }
+ return true;
+}
+
+bool
+FieldMerger::merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token)
+{
+ SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id);
+ std::vector<std::unique_ptr<FieldReader>> readers;
+ PostingPriorityQueue<FieldReader> heap;
+ /* OUTPUT */
+ FieldWriter fieldWriter(_fusion_out_index.get_doc_id_limit(), numWordIds);
+ vespalib::string indexName = index.getName();
+
+ if (!open_input_field_readers(list, readers)) {
+ return false;
+ }
+ FieldLengthInfo field_length_info;
+ if (!readers.empty()) {
+ field_length_info = readers.back()->get_field_length_info();
+ }
+ if (!open_field_writer(fieldWriter, field_length_info)) {
+ return false;
+ }
+ if (!setup_merge_heap(readers, fieldWriter, heap)) {
+ return false;
+ }
+
+ heap.merge(fieldWriter, 4, flush_token);
+ if (flush_token.stop_requested()) {
+ return false;
+ }
+ assert(heap.empty());
+
+ for (auto &reader : readers) {
+ if (!reader->close()) {
+ return false;
+ }
+ }
+ if (!fieldWriter.close()) {
+ throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s", _field_dir.c_str()));
+ }
+ return true;
+}
+
+bool
+FieldMerger::merge_field(std::shared_ptr<IFlushToken> flush_token)
+{
+ const Schema &schema = _fusion_out_index.get_schema();
+ SchemaUtil::IndexIterator index(schema, _id);
+ const vespalib::string &indexName = index.getName();
+ SchemaUtil::IndexSettings settings = index.getIndexSettings();
+ if (settings.hasError()) {
+ return false;
+ }
+
+ if (FileKit::hasStamp(_field_dir + "/.mergeocc_done")) {
+ return true;
+ }
+ vespalib::mkdir(_field_dir, false);
+
+ LOG(debug, "merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str());
+
+ make_tmp_dirs();
+
+ WordNumMappingList list(_fusion_out_index.get_old_indexes().size());
+ uint64_t numWordIds(0);
+ if (!renumber_word_ids(list, numWordIds, *flush_token)) {
+ if (flush_token->stop_requested()) {
+ return false;
+ }
+ LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), _field_dir.c_str());
+ return false;
+ }
+
+ // Tokamak
+ bool res = merge_postings(list, numWordIds, *flush_token);
+ if (!res) {
+ if (flush_token->stop_requested()) {
+ return false;
+ }
+ throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
+ indexName.c_str(), _field_dir.c_str()));
+ }
+ if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) {
+ return false;
+ }
+ vespalib::File::sync(_field_dir);
+
+ if (!clean_tmp_dirs()) {
+ return false;
+ }
+
+ LOG(debug, "Finished merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str());
+
+ return true;
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
new file mode 100644
index 00000000000..31c2818cf17
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -0,0 +1,54 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+namespace search {
+class IFlushToken;
+template <class IN> class PostingPriorityQueue;
+}
+
+namespace search::index { class FieldLengthInfo; }
+
+namespace search::diskindex {
+
+class DictionaryWordReader;
+class FieldLengthScanner;
+class FieldReader;
+class FieldWriter;
+class FusionOutputIndex;
+class WordNumMapping;
+
+/*
+ * Class for merging posting lists for a single field during fusion.
+ */
+class FieldMerger
+{
+ using WordNumMappingList = std::vector<WordNumMapping>;
+
+ uint32_t _id;
+ vespalib::string _field_dir;
+ const FusionOutputIndex& _fusion_out_index;
+
+ void make_tmp_dirs();
+ bool clean_tmp_dirs();
+ bool open_input_word_readers(std::vector<std::unique_ptr<DictionaryWordReader>>& readers, PostingPriorityQueue<DictionaryWordReader>& heap);
+ bool read_mapping_files(WordNumMappingList& list);
+ bool renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token);
+ std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner();
+ bool open_input_field_readers(const WordNumMappingList& list, std::vector<std::unique_ptr<FieldReader>>& readers);
+ bool open_field_writer(FieldWriter& writer, const index::FieldLengthInfo& field_length_info);
+ bool select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer);
+ bool setup_merge_heap(const std::vector<std::unique_ptr<FieldReader>>& readers, FieldWriter& writer, PostingPriorityQueue<FieldReader>& heap);
+ bool merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token);
+public:
+ FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index);
+ ~FieldMerger();
+ bool merge_field(std::shared_ptr<IFlushToken> flush_token);
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
index 67c92e18b48..5891849959b 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "fusion.h"
+#include "fusion_input_index.h"
+#include "field_merger.h"
#include "fieldreader.h"
#include "dictionarywordreader.h"
#include "field_length_scanner.h"
@@ -46,15 +48,6 @@ namespace search::diskindex {
namespace {
-vespalib::string
-createTmpPath(const vespalib::string & base, uint32_t index) {
- vespalib::asciistream os;
- os << base;
- os << "/tmpindex";
- os << index;
- return os.str();
-}
-
std::vector<FusionInputIndex>
createInputIndexes(const std::vector<vespalib::string> & sources, const SelectorArray &selector)
{
@@ -69,37 +62,11 @@ createInputIndexes(const std::vector<vespalib::string> & sources, const Selector
}
-FusionInputIndex::FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray &selector)
- : _path(path),
- _index(index),
- _schema()
-{
- vespalib::string fname = path + "/schema.txt";
- if ( ! _schema.loadFromFile(fname)) {
- throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str()));
- }
- if ( ! SchemaUtil::validateSchema(_schema)) {
- throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str()));
- }
- if (!_docIdMapping.readDocIdLimit(path)) {
- throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str()));
- }
- _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index);
-}
-
-FusionInputIndex::~FusionInputIndex() = default;
-
Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::string & dir,
const std::vector<vespalib::string> & sources, const SelectorArray &selector,
bool dynamicKPosIndexFormat, const TuneFileIndexing &tuneFileIndexing,
const FileHeaderContext &fileHeaderContext)
- : _schema(schema),
- _oldIndexes(createInputIndexes(sources, selector)),
- _docIdLimit(docIdLimit),
- _dynamicKPosIndexFormat(dynamicKPosIndexFormat),
- _outDir(dir),
- _tuneFileIndexing(tuneFileIndexing),
- _fileHeaderContext(fileHeaderContext)
+ : _fusion_out_index(schema, dir, createInputIndexes(sources, selector), docIdLimit, dynamicKPosIndexFormat, tuneFileIndexing, fileHeaderContext)
{
if (!readSchemaFiles()) {
throw IllegalArgumentException("Cannot read schema files for source indexes");
@@ -109,74 +76,6 @@ Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::strin
Fusion::~Fusion() = default;
bool
-Fusion::openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
- std::vector<std::unique_ptr<DictionaryWordReader> > & readers,
- PostingPriorityQueue<DictionaryWordReader> &heap)
-{
- for (auto & oi : _oldIndexes) {
- auto reader(std::make_unique<DictionaryWordReader>());
- const vespalib::string &tmpindexpath = createTmpPath(dir, oi.getIndex());
- const vespalib::string &oldindexpath = oi.getPath();
- vespalib::string wordMapName = tmpindexpath + "/old2new.dat";
- vespalib::string fieldDir(oldindexpath + "/" + index.getName());
- vespalib::string dictName(fieldDir + "/dictionary");
- const Schema &oldSchema = oi.getSchema();
- if (!index.hasOldFields(oldSchema)) {
- continue; // drop data
- }
- bool res = reader->open(dictName, wordMapName, _tuneFileIndexing._read);
- if (!res) {
- LOG(error, "Could not open dictionary %s to generate %s", dictName.c_str(), wordMapName.c_str());
- return false;
- }
- reader->read();
- if (reader->isValid()) {
- readers.push_back(std::move(reader));
- heap.initialAdd(readers.back().get());
- }
- }
- return true;
-}
-
-
-bool
-Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
- WordNumMappingList & list, uint64_t & numWordIds, const IFlushToken& flush_token)
-{
- vespalib::string indexName = index.getName();
- LOG(debug, "Renumber word IDs for field %s", indexName.c_str());
-
- std::vector<std::unique_ptr<DictionaryWordReader>> readers;
- PostingPriorityQueue<DictionaryWordReader> heap;
- WordAggregator out;
-
- if (!openInputWordReaders(dir, index, readers, heap)) {
- return false;
- }
- heap.merge(out, 4, flush_token);
- if (flush_token.stop_requested()) {
- return false;
- }
- assert(heap.empty());
- numWordIds = out.getWordNum();
-
- // Close files
- for (auto &i : readers) {
- i->close();
- }
-
- // Now read mapping files back into an array
- // XXX: avoid this, and instead make the array here
- if (!readMappingFiles(dir, &index, list)) {
- return false;
- }
- LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str());
-
- return true;
-}
-
-
-bool
Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token)
{
const Schema &schema = getSchema();
@@ -187,7 +86,8 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushT
for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) {
concurrent.wait();
executor.execute(vespalib::makeLambdaTask([this, index=iter.getIndex(), &failed, &done, &concurrent, flush_token]() {
- if (!mergeField(index, flush_token)) {
+ FieldMerger merger(index, _fusion_out_index);
+ if (!merger.merge_field(flush_token)) {
failed++;
}
concurrent.post();
@@ -202,308 +102,6 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushT
bool
-Fusion::mergeField(uint32_t id, std::shared_ptr<IFlushToken> flush_token)
-{
- typedef SchemaUtil::IndexIterator IndexIterator;
- typedef SchemaUtil::IndexSettings IndexSettings;
-
- const Schema &schema = getSchema();
- IndexIterator index(schema, id);
- const vespalib::string &indexName = index.getName();
- IndexSettings settings = index.getIndexSettings();
- if (settings.hasError()) {
- return false;
- }
- vespalib::string indexDir = _outDir + "/" + indexName;
-
- if (FileKit::hasStamp(indexDir + "/.mergeocc_done")) {
- return true;
- }
- vespalib::mkdir(indexDir, false);
-
- LOG(debug, "mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str());
-
- makeTmpDirs(indexDir);
-
- WordNumMappingList list(_oldIndexes.size());
- uint64_t numWordIds(0);
- if (!renumberFieldWordIds(indexDir, index, list, numWordIds, *flush_token)) {
- if (flush_token->stop_requested()) {
- return false;
- }
- LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), indexDir.c_str());
- return false;
- }
-
- // Tokamak
- bool res = mergeFieldPostings(index, list, numWordIds, *flush_token);
- if (!res) {
- if (flush_token->stop_requested()) {
- return false;
- }
- throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s",
- indexName.c_str(), indexDir.c_str()));
- }
- if (!FileKit::createStamp(indexDir + "/.mergeocc_done")) {
- return false;
- }
- vespalib::File::sync(indexDir);
-
- if (!cleanTmpDirs(indexDir)) {
- return false;
- }
-
- LOG(debug, "Finished mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str());
-
- return true;
-}
-
-template <class Reader, class Writer>
-bool
-Fusion::selectCookedOrRawFeatures(Reader &reader, Writer &writer)
-{
- bool rawFormatOK = true;
- bool cookedFormatOK = true;
- PostingListParams featureParams;
- PostingListParams outFeatureParams;
- vespalib::string cookedFormat;
- vespalib::string rawFormat;
-
- if (!reader.isValid()) {
- return true;
- }
- {
- writer.getFeatureParams(featureParams);
- cookedFormat = featureParams.getStr("cookedEncoding");
- rawFormat = featureParams.getStr("encoding");
- if (rawFormat == "") {
- rawFormatOK = false; // Typically uncompressed file
- }
- outFeatureParams = featureParams;
- }
- {
- reader.getFeatureParams(featureParams);
- if (cookedFormat != featureParams.getStr("cookedEncoding")) {
- cookedFormatOK = false;
- }
- if (rawFormat != featureParams.getStr("encoding")) {
- rawFormatOK = false;
- }
- if (featureParams != outFeatureParams) {
- rawFormatOK = false;
- }
- if (!reader.allowRawFeatures()) {
- rawFormatOK = false; // Reader transforms data
- }
- }
- if (!cookedFormatOK) {
- LOG(error, "Cannot perform fusion, cooked feature formats don't match");
- return false;
- }
- if (rawFormatOK) {
- featureParams.clear();
- featureParams.set("cooked", false);
- reader.setFeatureParams(featureParams);
- reader.getFeatureParams(featureParams);
- if (featureParams.isSet("cookedEncoding") ||
- rawFormat != featureParams.getStr("encoding")) {
- rawFormatOK = false;
- }
- if (!rawFormatOK) {
- LOG(error, "Cannot perform fusion, raw format setting failed");
- return false;
- }
- LOG(debug, "Using raw feature format for fusion of posting files");
- }
- return true;
-}
-
-
-std::shared_ptr<FieldLengthScanner>
-Fusion::allocate_field_length_scanner(const SchemaUtil::IndexIterator &index)
-{
- if (index.use_interleaved_features()) {
- PosOccFieldsParams fieldsParams;
- fieldsParams.setSchemaParams(index.getSchema(), index.getIndex());
- assert(fieldsParams.getNumFields() > 0);
- const PosOccFieldParams &fieldParams = fieldsParams.getFieldParams()[0];
- if (fieldParams._hasElements) {
- for (const auto &old_index : _oldIndexes) {
- const Schema &old_schema = old_index.getSchema();
- if (index.hasOldFields(old_schema) &&
- !index.has_matching_use_interleaved_features(old_schema)) {
- return std::make_shared<FieldLengthScanner>(_docIdLimit);
- }
- }
- }
- }
- return std::shared_ptr<FieldLengthScanner>();
-}
-
-bool
-Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list,
- std::vector<std::unique_ptr<FieldReader> > & readers)
-{
- auto field_length_scanner = allocate_field_length_scanner(index);
- vespalib::string indexName = index.getName();
- for (const auto &oi : _oldIndexes) {
- const Schema &oldSchema = oi.getSchema();
- if (!index.hasOldFields(oldSchema)) {
- continue; // drop data
- }
- auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner);
- reader->setup(list[oi.getIndex()], oi.getDocIdMapping());
- if (!reader->open(oi.getPath() + "/" + indexName + "/", _tuneFileIndexing._read)) {
- return false;
- }
- readers.push_back(std::move(reader));
- }
- return true;
-}
-
-
-bool
-Fusion::openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter &writer, const FieldLengthInfo &field_length_info)
-{
- vespalib::string dir = _outDir + "/" + index.getName();
-
- if (!writer.open(dir + "/", 64, 262144, _dynamicKPosIndexFormat,
- index.use_interleaved_features(), index.getSchema(),
- index.getIndex(),
- field_length_info,
- _tuneFileIndexing._write, _fileHeaderContext)) {
- throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", dir.c_str()));
- }
- return true;
-}
-
-
-bool
-Fusion::setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers,
- FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap)
-{
- for (auto &reader : readers) {
- if (!selectCookedOrRawFeatures(*reader, writer)) {
- return false;
- }
- if (reader->isValid()) {
- reader->read();
- }
- if (reader->isValid()) {
- heap.initialAdd(reader.get());
- }
- }
- return true;
-}
-
-
-bool
-Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token)
-{
- std::vector<std::unique_ptr<FieldReader>> readers;
- PostingPriorityQueue<FieldReader> heap;
- /* OUTPUT */
- FieldWriter fieldWriter(_docIdLimit, numWordIds);
- vespalib::string indexName = index.getName();
-
- if (!openInputFieldReaders(index, list, readers)) {
- return false;
- }
- FieldLengthInfo field_length_info;
- if (!readers.empty()) {
- field_length_info = readers.back()->get_field_length_info();
- }
- if (!openFieldWriter(index, fieldWriter, field_length_info)) {
- return false;
- }
- if (!setupMergeHeap(readers, fieldWriter, heap)) {
- return false;
- }
-
- heap.merge(fieldWriter, 4, flush_token);
- if (flush_token.stop_requested()) {
- return false;
- }
- assert(heap.empty());
-
- for (auto &reader : readers) {
- if (!reader->close()) {
- return false;
- }
- }
- if (!fieldWriter.close()) {
- throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s/%s",
- _outDir.c_str(), indexName.c_str()));
- }
- return true;
-}
-
-
-bool
-Fusion::readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list)
-{
- for (const auto & oi : _oldIndexes) {
- std::vector<uint32_t> oldIndexes;
- const Schema &oldSchema = oi.getSchema();
- if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) {
- return false;
- }
- WordNumMapping &wordNumMapping = list[oi.getIndex()];
- if (oldIndexes.empty()) {
- wordNumMapping.noMappingFile();
- continue;
- }
- if (index && !index->hasOldFields(oldSchema)) {
- continue; // drop data
- }
-
- // Open word mapping file
- vespalib::string old2newname = createTmpPath(dir, oi.getIndex()) + "/old2new.dat";
- wordNumMapping.readMappingFile(old2newname, _tuneFileIndexing._read);
- }
-
- return true;
-}
-
-
-void
-Fusion::makeTmpDirs(const vespalib::string & dir)
-{
- for (const auto & index : _oldIndexes) {
- vespalib::mkdir(createTmpPath(dir, index.getIndex()), false);
- }
-}
-
-bool
-Fusion::cleanTmpDirs(const vespalib::string & dir)
-{
- uint32_t i = 0;
- for (;;) {
- vespalib::string tmpindexpath = createTmpPath(dir, i);
- FastOS_StatInfo statInfo;
- if (!FastOS_File::Stat(tmpindexpath.c_str(), &statInfo)) {
- if (statInfo._error == FastOS_StatInfo::FileNotFound) {
- break;
- }
- LOG(error, "Failed to stat tmpdir %s", tmpindexpath.c_str());
- return false;
- }
- i++;
- }
- while (i > 0) {
- i--;
- vespalib::string tmpindexpath = createTmpPath(dir, i);
- search::DirectoryTraverse dt(tmpindexpath.c_str());
- if (!dt.RemoveTree()) {
- LOG(error, "Failed to clean tmpdir %s", tmpindexpath.c_str());
- return false;
- }
- }
- return true;
-}
-
-
-bool
Fusion::checkSchemaCompat()
{
/* TODO: Check compatibility */
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h
index c1acf41a043..22dda4d6edf 100644
--- a/searchlib/src/vespa/searchlib/diskindex/fusion.h
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h
@@ -2,88 +2,38 @@
#pragma once
-#include "docidmapper.h"
-#include "wordnummapper.h"
+#include "fusion_output_index.h"
-#include <vespa/searchlib/index/schemautil.h>
#include <vespa/vespalib/util/threadexecutor.h>
-namespace search { template <class IN> class PostingPriorityQueue; }
namespace search {
class IFlushToken;
+template <class IN> class PostingPriorityQueue;
class TuneFileIndexing;
}
-namespace search::common { class FileHeaderContext; }
-namespace search::index { class FieldLengthInfo; }
-namespace search::diskindex {
-
-class FieldLengthScanner;
-class FieldReader;
-class FieldWriter;
-class DictionaryWordReader;
-
-class FusionInputIndex
-{
-private :
- vespalib::string _path;
- uint32_t _index;
- index::Schema _schema;
- DocIdMapping _docIdMapping;
-
-public:
- FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray & selector);
- FusionInputIndex(FusionInputIndex &&) = default;
- FusionInputIndex & operator = (FusionInputIndex &&) = default;
- ~FusionInputIndex();
+namespace vespalib { template <typename T> class Array; }
- const vespalib::string & getPath() const { return _path; }
- uint32_t getIndex() const { return _index; }
- const DocIdMapping & getDocIdMapping() const { return _docIdMapping; }
- const index::Schema &getSchema() const { return _schema; }
-};
+namespace search::diskindex {
+using SelectorArray = vespalib::Array<uint8_t>;
+/*
+ * Class that handles fusion of a set of disk indexes into a new disk
+ * index.
+ */
class Fusion
{
private:
using Schema = index::Schema;
- using SchemaUtil = index::SchemaUtil;
- using WordNumMappingList = std::vector<WordNumMapping>;
- bool mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr<IFlushToken> flush_token);
- bool mergeField(uint32_t id, std::shared_ptr<IFlushToken> flush_token);
- std::shared_ptr<FieldLengthScanner> allocate_field_length_scanner(const SchemaUtil::IndexIterator &index);
- bool openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list,
- std::vector<std::unique_ptr<FieldReader> > & readers);
- bool openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter & writer, const index::FieldLengthInfo &field_length_info);
- bool setupMergeHeap(const std::vector<std::unique_ptr<FieldReader> > & readers,
- FieldWriter &writer, PostingPriorityQueue<FieldReader> &heap);
- bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token);
- bool openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
- std::vector<std::unique_ptr<DictionaryWordReader> > &readers,
- PostingPriorityQueue<DictionaryWordReader> &heap);
- bool renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index,
- WordNumMappingList & list, uint64_t& numWordIds, const IFlushToken& flush_token);
- void makeTmpDirs(const vespalib::string & dir);
- bool cleanTmpDirs(const vespalib::string & dir);
+ bool mergeFields(vespalib::ThreadExecutor& executor, std::shared_ptr<IFlushToken> flush_token);
bool readSchemaFiles();
bool checkSchemaCompat();
- template <class Reader, class Writer>
- static bool selectCookedOrRawFeatures(Reader &reader, Writer &writer);
-
- bool readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list);
- const Schema &getSchema() const { return _schema; }
-
- const Schema &_schema; // External ownership
- std::vector<FusionInputIndex> _oldIndexes;
- const uint32_t _docIdLimit;
- const bool _dynamicKPosIndexFormat;
- vespalib::string _outDir;
+ const Schema &getSchema() const { return _fusion_out_index.get_schema(); }
- const TuneFileIndexing &_tuneFileIndexing;
- const common::FileHeaderContext &_fileHeaderContext;
+ FusionOutputIndex _fusion_out_index;
public:
Fusion(const Fusion &) = delete;
Fusion& operator=(const Fusion &) = delete;
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp
new file mode 100644
index 00000000000..278d095b639
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "fusion_input_index.h"
+#include <vespa/searchlib/index/schemautil.h>
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/stringfmt.h>
+
+using search::index::SchemaUtil;
+using vespalib::IllegalArgumentException;
+using vespalib::make_string;
+
+namespace search::diskindex {
+
+FusionInputIndex::FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector)
+ : _path(path),
+ _index(index),
+ _schema()
+{
+ vespalib::string fname = path + "/schema.txt";
+ if ( ! _schema.loadFromFile(fname)) {
+ throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str()));
+ }
+ if ( ! SchemaUtil::validateSchema(_schema)) {
+ throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str()));
+ }
+ if (!_docIdMapping.readDocIdLimit(path)) {
+ throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str()));
+ }
+ _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index);
+}
+
+FusionInputIndex::~FusionInputIndex() = default;
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h
new file mode 100644
index 00000000000..fd7bb8f0256
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "docidmapper.h"
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace search::diskindex {
+
+/*
+ * Class representing an index used as input for fusion.
+ */
+class FusionInputIndex
+{
+private:
+ vespalib::string _path;
+ uint32_t _index;
+ index::Schema _schema;
+ DocIdMapping _docIdMapping;
+
+public:
+ FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector);
+ FusionInputIndex(FusionInputIndex&&) = default;
+ FusionInputIndex & operator = (FusionInputIndex&&) = default;
+ ~FusionInputIndex();
+
+ const vespalib::string& getPath() const noexcept { return _path; }
+ uint32_t getIndex() const noexcept { return _index; }
+ const DocIdMapping& getDocIdMapping() const noexcept { return _docIdMapping; }
+ const index::Schema& getSchema() const noexcept { return _schema; }
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
new file mode 100644
index 00000000000..66ec0889cbe
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp
@@ -0,0 +1,21 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "fusion_output_index.h"
+#include "fusion_input_index.h"
+
+namespace search::diskindex {
+
+FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context)
+ : _schema(schema),
+ _path(path),
+ _old_indexes(std::move(old_indexes)),
+ _doc_id_limit(doc_id_limit),
+ _dynamic_k_pos_index_format(dynamic_k_pos_index_format),
+ _tune_file_indexing(tune_file_indexing),
+ _file_header_context(file_header_context)
+{
+}
+
+FusionOutputIndex::~FusionOutputIndex() = default;
+
+}
diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
new file mode 100644
index 00000000000..c366b111363
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h
@@ -0,0 +1,43 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <vector>
+
+namespace search { class TuneFileIndexing; }
+namespace search::common { class FileHeaderContext; }
+namespace search::index { class Schema; }
+
+namespace search::diskindex {
+
+class FusionInputIndex;
+
+/*
+ * Class representing the portions of fusion output index state needed by
+ * FieldMerger.
+ */
+class FusionOutputIndex
+{
+private:
+ const index::Schema& _schema;
+ const vespalib::string _path;
+ const std::vector<FusionInputIndex> _old_indexes;
+ const uint32_t _doc_id_limit;
+ const bool _dynamic_k_pos_index_format;
+ const TuneFileIndexing& _tune_file_indexing;
+ const common::FileHeaderContext& _file_header_context;
+public:
+ FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector<FusionInputIndex> old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context);
+ ~FusionOutputIndex();
+
+ const index::Schema& get_schema() const noexcept { return _schema; }
+ const vespalib::string& get_path() const noexcept { return _path; }
+ const std::vector<FusionInputIndex>& get_old_indexes() const noexcept { return _old_indexes; }
+ uint32_t get_doc_id_limit() const noexcept { return _doc_id_limit; }
+ bool get_dynamic_k_pos_index_format() const noexcept { return _dynamic_k_pos_index_format; }
+ const TuneFileIndexing& get_tune_file_indexing() const noexcept { return _tune_file_indexing; }
+ const common::FileHeaderContext& get_file_header_context() const noexcept { return _file_header_context; }
+};
+
+}