From eed19c002fecdb424b5452ae7849755e6e132efb Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 5 Jan 2022 17:30:39 +0100 Subject: Factor out FieldMerger from Fusion. --- .../src/tests/diskindex/fusion/fusion_test.cpp | 1 + .../src/vespa/searchlib/diskindex/CMakeLists.txt | 3 + .../src/vespa/searchlib/diskindex/field_merger.cpp | 417 +++++++++++++++++++++ .../src/vespa/searchlib/diskindex/field_merger.h | 54 +++ searchlib/src/vespa/searchlib/diskindex/fusion.cpp | 412 +------------------- searchlib/src/vespa/searchlib/diskindex/fusion.h | 74 +--- .../searchlib/diskindex/fusion_input_index.cpp | 34 ++ .../vespa/searchlib/diskindex/fusion_input_index.h | 34 ++ .../searchlib/diskindex/fusion_output_index.cpp | 21 ++ .../searchlib/diskindex/fusion_output_index.h | 43 +++ 10 files changed, 624 insertions(+), 469 deletions(-) create mode 100644 searchlib/src/vespa/searchlib/diskindex/field_merger.cpp create mode 100644 searchlib/src/vespa/searchlib/diskindex/field_merger.h create mode 100644 searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp create mode 100644 searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h create mode 100644 searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp create mode 100644 searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h (limited to 'searchlib') diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 1c86981372d..6794b9c0f5c 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt index 261874fb4c8..74a873a4e29 100644 --- a/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/diskindex/CMakeLists.txt @@ -10,11 +10,14 @@ vespa_add_library(searchlib_diskindex OBJECT disktermblueprint.cpp docidmapper.cpp extposocc.cpp + field_merger.cpp fieldreader.cpp fieldwriter.cpp field_length_scanner.cpp fileheader.cpp fusion.cpp + fusion_input_index.cpp + fusion_output_index.cpp indexbuilder.cpp pagedict4file.cpp pagedict4randread.cpp diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp new file mode 100644 index 00000000000..375d30d3003 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.cpp @@ -0,0 +1,417 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "field_merger.h" +#include "fieldreader.h" +#include "field_length_scanner.h" +#include "fusion_input_index.h" +#include "fusion_output_index.h" +#include "dictionarywordreader.h" +#include "wordnummapper.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +LOG_SETUP(".diskindex.field_merger"); + +using search::FileKit; +using search::bitcompression::PosOccFieldParams; +using search::bitcompression::PosOccFieldsParams; +using search::common::FileHeaderContext; +using search::index::FieldLengthInfo; +using search::index::PostingListParams; +using search::index::Schema; +using search::index::SchemaUtil; +using search::index::schema::DataType; +using vespalib::IllegalArgumentException; +using vespalib::make_string; + +namespace search::diskindex { + + +namespace { + +vespalib::string +createTmpPath(const vespalib::string & base, uint32_t index) { + vespalib::asciistream os; + os << base; + os << "/tmpindex"; + os << index; + return os.str(); +} + +} + +FieldMerger::FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index) + : _id(id), + _field_dir(fusion_out_index.get_path() + "/" + SchemaUtil::IndexIterator(fusion_out_index.get_schema(), id).getName()), + _fusion_out_index(fusion_out_index) +{ +} + +FieldMerger::~FieldMerger() = default; + +void +FieldMerger::make_tmp_dirs() +{ + for (const auto & index : _fusion_out_index.get_old_indexes()) { + vespalib::mkdir(createTmpPath(_field_dir, index.getIndex()), false); + } +} + +bool +FieldMerger::clean_tmp_dirs() +{ + uint32_t i = 0; + for (;;) { + vespalib::string tmpindexpath = createTmpPath(_field_dir, i); + FastOS_StatInfo statInfo; + if (!FastOS_File::Stat(tmpindexpath.c_str(), &statInfo)) { + if (statInfo._error == FastOS_StatInfo::FileNotFound) { + break; + } + LOG(error, "Failed to stat tmpdir %s", tmpindexpath.c_str()); + return false; + } + i++; + } + while (i > 0) { + i--; + vespalib::string tmpindexpath = createTmpPath(_field_dir, i); + search::DirectoryTraverse dt(tmpindexpath.c_str()); + if (!dt.RemoveTree()) { + LOG(error, "Failed to clean tmpdir %s", tmpindexpath.c_str()); + return false; + } + } + return true; +} + +bool +FieldMerger::open_input_word_readers(std::vector> & readers, PostingPriorityQueue& heap) +{ + SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); + for (auto & oi : _fusion_out_index.get_old_indexes()) { + auto reader(std::make_unique()); + const vespalib::string &tmpindexpath = createTmpPath(_field_dir, oi.getIndex()); + const vespalib::string &oldindexpath = oi.getPath(); + vespalib::string wordMapName = tmpindexpath + "/old2new.dat"; + vespalib::string fieldDir(oldindexpath + "/" + index.getName()); + vespalib::string dictName(fieldDir + "/dictionary"); + const Schema &oldSchema = oi.getSchema(); + if (!index.hasOldFields(oldSchema)) { + continue; // drop data + } + bool res = reader->open(dictName, wordMapName, _fusion_out_index.get_tune_file_indexing()._read); + if (!res) { + LOG(error, "Could not open dictionary %s to generate %s", dictName.c_str(), wordMapName.c_str()); + return false; + } + reader->read(); + if (reader->isValid()) { + readers.push_back(std::move(reader)); + heap.initialAdd(readers.back().get()); + } + } + return true; +} + +bool +FieldMerger::read_mapping_files(WordNumMappingList& list) +{ + SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); + for (const auto & oi : _fusion_out_index.get_old_indexes()) { + std::vector oldIndexes; + const Schema &oldSchema = oi.getSchema(); + if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) { + return false; + } + WordNumMapping &wordNumMapping = list[oi.getIndex()]; + if (oldIndexes.empty()) { + wordNumMapping.noMappingFile(); + continue; + } + if (!index.hasOldFields(oldSchema)) { + continue; // drop data + } + + // Open word mapping file + vespalib::string old2newname = createTmpPath(_field_dir, oi.getIndex()) + "/old2new.dat"; + wordNumMapping.readMappingFile(old2newname, _fusion_out_index.get_tune_file_indexing()._read); + } + + return true; +} + +bool +FieldMerger::renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token) +{ + SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); + vespalib::string indexName = index.getName(); + LOG(debug, "Renumber word IDs for field %s", indexName.c_str()); + + std::vector> readers; + PostingPriorityQueue heap; + WordAggregator out; + + if (!open_input_word_readers(readers, heap)) { + return false; + } + heap.merge(out, 4, flush_token); + if (flush_token.stop_requested()) { + return false; + } + assert(heap.empty()); + numWordIds = out.getWordNum(); + + // Close files + for (auto &i : readers) { + i->close(); + } + + // Now read mapping files back into an array + // XXX: avoid this, and instead make the array here + if (!read_mapping_files(list)) { + return false; + } + LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str()); + + return true; +} + +std::shared_ptr +FieldMerger::allocate_field_length_scanner() +{ + SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); + if (index.use_interleaved_features()) { + PosOccFieldsParams fieldsParams; + fieldsParams.setSchemaParams(index.getSchema(), index.getIndex()); + assert(fieldsParams.getNumFields() > 0); + const PosOccFieldParams &fieldParams = fieldsParams.getFieldParams()[0]; + if (fieldParams._hasElements) { + for (const auto &old_index : _fusion_out_index.get_old_indexes()) { + const Schema &old_schema = old_index.getSchema(); + if (index.hasOldFields(old_schema) && + !index.has_matching_use_interleaved_features(old_schema)) { + return std::make_shared(_fusion_out_index.get_doc_id_limit()); + } + } + } + } + return std::shared_ptr(); +} + +bool +FieldMerger::open_input_field_readers(const WordNumMappingList& list, std::vector>& readers) +{ + SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); + auto field_length_scanner = allocate_field_length_scanner(); + vespalib::string indexName = index.getName(); + for (const auto &oi : _fusion_out_index.get_old_indexes()) { + const Schema &oldSchema = oi.getSchema(); + if (!index.hasOldFields(oldSchema)) { + continue; // drop data + } + auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner); + reader->setup(list[oi.getIndex()], oi.getDocIdMapping()); + if (!reader->open(oi.getPath() + "/" + indexName + "/", _fusion_out_index.get_tune_file_indexing()._read)) { + return false; + } + readers.push_back(std::move(reader)); + } + return true; +} + +bool +FieldMerger::open_field_writer(FieldWriter& writer, const FieldLengthInfo& field_length_info) +{ + SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); + if (!writer.open(_field_dir + "/", 64, 262144, _fusion_out_index.get_dynamic_k_pos_index_format(), + index.use_interleaved_features(), index.getSchema(), + index.getIndex(), + field_length_info, + _fusion_out_index.get_tune_file_indexing()._write, _fusion_out_index.get_file_header_context())) { + throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", _field_dir.c_str())); + } + return true; +} + +bool +FieldMerger::select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer) +{ + bool rawFormatOK = true; + bool cookedFormatOK = true; + PostingListParams featureParams; + PostingListParams outFeatureParams; + vespalib::string cookedFormat; + vespalib::string rawFormat; + + if (!reader.isValid()) { + return true; + } + { + writer.getFeatureParams(featureParams); + cookedFormat = featureParams.getStr("cookedEncoding"); + rawFormat = featureParams.getStr("encoding"); + if (rawFormat == "") { + rawFormatOK = false; // Typically uncompressed file + } + outFeatureParams = featureParams; + } + { + reader.getFeatureParams(featureParams); + if (cookedFormat != featureParams.getStr("cookedEncoding")) { + cookedFormatOK = false; + } + if (rawFormat != featureParams.getStr("encoding")) { + rawFormatOK = false; + } + if (featureParams != outFeatureParams) { + rawFormatOK = false; + } + if (!reader.allowRawFeatures()) { + rawFormatOK = false; // Reader transforms data + } + } + if (!cookedFormatOK) { + LOG(error, "Cannot perform fusion, cooked feature formats don't match"); + return false; + } + if (rawFormatOK) { + featureParams.clear(); + featureParams.set("cooked", false); + reader.setFeatureParams(featureParams); + reader.getFeatureParams(featureParams); + if (featureParams.isSet("cookedEncoding") || + rawFormat != featureParams.getStr("encoding")) { + rawFormatOK = false; + } + if (!rawFormatOK) { + LOG(error, "Cannot perform fusion, raw format setting failed"); + return false; + } + LOG(debug, "Using raw feature format for fusion of posting files"); + } + return true; +} + +bool +FieldMerger::setup_merge_heap(const std::vector>& readers, FieldWriter& writer, PostingPriorityQueue& heap) +{ + for (auto &reader : readers) { + if (!select_cooked_or_raw_features(*reader, writer)) { + return false; + } + if (reader->isValid()) { + reader->read(); + } + if (reader->isValid()) { + heap.initialAdd(reader.get()); + } + } + return true; +} + +bool +FieldMerger::merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token) +{ + SchemaUtil::IndexIterator index(_fusion_out_index.get_schema(), _id); + std::vector> readers; + PostingPriorityQueue heap; + /* OUTPUT */ + FieldWriter fieldWriter(_fusion_out_index.get_doc_id_limit(), numWordIds); + vespalib::string indexName = index.getName(); + + if (!open_input_field_readers(list, readers)) { + return false; + } + FieldLengthInfo field_length_info; + if (!readers.empty()) { + field_length_info = readers.back()->get_field_length_info(); + } + if (!open_field_writer(fieldWriter, field_length_info)) { + return false; + } + if (!setup_merge_heap(readers, fieldWriter, heap)) { + return false; + } + + heap.merge(fieldWriter, 4, flush_token); + if (flush_token.stop_requested()) { + return false; + } + assert(heap.empty()); + + for (auto &reader : readers) { + if (!reader->close()) { + return false; + } + } + if (!fieldWriter.close()) { + throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s", _field_dir.c_str())); + } + return true; +} + +bool +FieldMerger::merge_field(std::shared_ptr flush_token) +{ + const Schema &schema = _fusion_out_index.get_schema(); + SchemaUtil::IndexIterator index(schema, _id); + const vespalib::string &indexName = index.getName(); + SchemaUtil::IndexSettings settings = index.getIndexSettings(); + if (settings.hasError()) { + return false; + } + + if (FileKit::hasStamp(_field_dir + "/.mergeocc_done")) { + return true; + } + vespalib::mkdir(_field_dir, false); + + LOG(debug, "merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str()); + + make_tmp_dirs(); + + WordNumMappingList list(_fusion_out_index.get_old_indexes().size()); + uint64_t numWordIds(0); + if (!renumber_word_ids(list, numWordIds, *flush_token)) { + if (flush_token->stop_requested()) { + return false; + } + LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), _field_dir.c_str()); + return false; + } + + // Tokamak + bool res = merge_postings(list, numWordIds, *flush_token); + if (!res) { + if (flush_token->stop_requested()) { + return false; + } + throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", + indexName.c_str(), _field_dir.c_str())); + } + if (!FileKit::createStamp(_field_dir + "/.mergeocc_done")) { + return false; + } + vespalib::File::sync(_field_dir); + + if (!clean_tmp_dirs()) { + return false; + } + + LOG(debug, "Finished merge_field for field %s dir %s", indexName.c_str(), _field_dir.c_str()); + + return true; +} + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h new file mode 100644 index 00000000000..31c2818cf17 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -0,0 +1,54 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include +#include +#include +#include + +namespace search { +class IFlushToken; +template class PostingPriorityQueue; +} + +namespace search::index { class FieldLengthInfo; } + +namespace search::diskindex { + +class DictionaryWordReader; +class FieldLengthScanner; +class FieldReader; +class FieldWriter; +class FusionOutputIndex; +class WordNumMapping; + +/* + * Class for merging posting lists for a single field during fusion. + */ +class FieldMerger +{ + using WordNumMappingList = std::vector; + + uint32_t _id; + vespalib::string _field_dir; + const FusionOutputIndex& _fusion_out_index; + + void make_tmp_dirs(); + bool clean_tmp_dirs(); + bool open_input_word_readers(std::vector>& readers, PostingPriorityQueue& heap); + bool read_mapping_files(WordNumMappingList& list); + bool renumber_word_ids(WordNumMappingList& list, uint64_t& numWordIds, const IFlushToken& flush_token); + std::shared_ptr allocate_field_length_scanner(); + bool open_input_field_readers(const WordNumMappingList& list, std::vector>& readers); + bool open_field_writer(FieldWriter& writer, const index::FieldLengthInfo& field_length_info); + bool select_cooked_or_raw_features(FieldReader& reader, FieldWriter& writer); + bool setup_merge_heap(const std::vector>& readers, FieldWriter& writer, PostingPriorityQueue& heap); + bool merge_postings(const WordNumMappingList& list, uint64_t numWordIds, const IFlushToken& flush_token); +public: + FieldMerger(uint32_t id, const FusionOutputIndex& fusion_out_index); + ~FieldMerger(); + bool merge_field(std::shared_ptr flush_token); +}; + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 67c92e18b48..5891849959b 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "fusion.h" +#include "fusion_input_index.h" +#include "field_merger.h" #include "fieldreader.h" #include "dictionarywordreader.h" #include "field_length_scanner.h" @@ -46,15 +48,6 @@ namespace search::diskindex { namespace { -vespalib::string -createTmpPath(const vespalib::string & base, uint32_t index) { - vespalib::asciistream os; - os << base; - os << "/tmpindex"; - os << index; - return os.str(); -} - std::vector createInputIndexes(const std::vector & sources, const SelectorArray &selector) { @@ -69,37 +62,11 @@ createInputIndexes(const std::vector & sources, const Selector } -FusionInputIndex::FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray &selector) - : _path(path), - _index(index), - _schema() -{ - vespalib::string fname = path + "/schema.txt"; - if ( ! _schema.loadFromFile(fname)) { - throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str())); - } - if ( ! SchemaUtil::validateSchema(_schema)) { - throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str())); - } - if (!_docIdMapping.readDocIdLimit(path)) { - throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str())); - } - _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index); -} - -FusionInputIndex::~FusionInputIndex() = default; - Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::string & dir, const std::vector & sources, const SelectorArray &selector, bool dynamicKPosIndexFormat, const TuneFileIndexing &tuneFileIndexing, const FileHeaderContext &fileHeaderContext) - : _schema(schema), - _oldIndexes(createInputIndexes(sources, selector)), - _docIdLimit(docIdLimit), - _dynamicKPosIndexFormat(dynamicKPosIndexFormat), - _outDir(dir), - _tuneFileIndexing(tuneFileIndexing), - _fileHeaderContext(fileHeaderContext) + : _fusion_out_index(schema, dir, createInputIndexes(sources, selector), docIdLimit, dynamicKPosIndexFormat, tuneFileIndexing, fileHeaderContext) { if (!readSchemaFiles()) { throw IllegalArgumentException("Cannot read schema files for source indexes"); @@ -108,74 +75,6 @@ Fusion::Fusion(uint32_t docIdLimit, const Schema & schema, const vespalib::strin Fusion::~Fusion() = default; -bool -Fusion::openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, - std::vector > & readers, - PostingPriorityQueue &heap) -{ - for (auto & oi : _oldIndexes) { - auto reader(std::make_unique()); - const vespalib::string &tmpindexpath = createTmpPath(dir, oi.getIndex()); - const vespalib::string &oldindexpath = oi.getPath(); - vespalib::string wordMapName = tmpindexpath + "/old2new.dat"; - vespalib::string fieldDir(oldindexpath + "/" + index.getName()); - vespalib::string dictName(fieldDir + "/dictionary"); - const Schema &oldSchema = oi.getSchema(); - if (!index.hasOldFields(oldSchema)) { - continue; // drop data - } - bool res = reader->open(dictName, wordMapName, _tuneFileIndexing._read); - if (!res) { - LOG(error, "Could not open dictionary %s to generate %s", dictName.c_str(), wordMapName.c_str()); - return false; - } - reader->read(); - if (reader->isValid()) { - readers.push_back(std::move(reader)); - heap.initialAdd(readers.back().get()); - } - } - return true; -} - - -bool -Fusion::renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, - WordNumMappingList & list, uint64_t & numWordIds, const IFlushToken& flush_token) -{ - vespalib::string indexName = index.getName(); - LOG(debug, "Renumber word IDs for field %s", indexName.c_str()); - - std::vector> readers; - PostingPriorityQueue heap; - WordAggregator out; - - if (!openInputWordReaders(dir, index, readers, heap)) { - return false; - } - heap.merge(out, 4, flush_token); - if (flush_token.stop_requested()) { - return false; - } - assert(heap.empty()); - numWordIds = out.getWordNum(); - - // Close files - for (auto &i : readers) { - i->close(); - } - - // Now read mapping files back into an array - // XXX: avoid this, and instead make the array here - if (!readMappingFiles(dir, &index, list)) { - return false; - } - LOG(debug, "Finished renumbering words IDs for field %s", indexName.c_str()); - - return true; -} - - bool Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr flush_token) { @@ -187,7 +86,8 @@ Fusion::mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr flush_token) -{ - typedef SchemaUtil::IndexIterator IndexIterator; - typedef SchemaUtil::IndexSettings IndexSettings; - - const Schema &schema = getSchema(); - IndexIterator index(schema, id); - const vespalib::string &indexName = index.getName(); - IndexSettings settings = index.getIndexSettings(); - if (settings.hasError()) { - return false; - } - vespalib::string indexDir = _outDir + "/" + indexName; - - if (FileKit::hasStamp(indexDir + "/.mergeocc_done")) { - return true; - } - vespalib::mkdir(indexDir, false); - - LOG(debug, "mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str()); - - makeTmpDirs(indexDir); - - WordNumMappingList list(_oldIndexes.size()); - uint64_t numWordIds(0); - if (!renumberFieldWordIds(indexDir, index, list, numWordIds, *flush_token)) { - if (flush_token->stop_requested()) { - return false; - } - LOG(error, "Could not renumber field word ids for field %s dir %s", indexName.c_str(), indexDir.c_str()); - return false; - } - - // Tokamak - bool res = mergeFieldPostings(index, list, numWordIds, *flush_token); - if (!res) { - if (flush_token->stop_requested()) { - return false; - } - throw IllegalArgumentException(make_string("Could not merge field postings for field %s dir %s", - indexName.c_str(), indexDir.c_str())); - } - if (!FileKit::createStamp(indexDir + "/.mergeocc_done")) { - return false; - } - vespalib::File::sync(indexDir); - - if (!cleanTmpDirs(indexDir)) { - return false; - } - - LOG(debug, "Finished mergeField for field %s dir %s", indexName.c_str(), indexDir.c_str()); - - return true; -} - -template -bool -Fusion::selectCookedOrRawFeatures(Reader &reader, Writer &writer) -{ - bool rawFormatOK = true; - bool cookedFormatOK = true; - PostingListParams featureParams; - PostingListParams outFeatureParams; - vespalib::string cookedFormat; - vespalib::string rawFormat; - - if (!reader.isValid()) { - return true; - } - { - writer.getFeatureParams(featureParams); - cookedFormat = featureParams.getStr("cookedEncoding"); - rawFormat = featureParams.getStr("encoding"); - if (rawFormat == "") { - rawFormatOK = false; // Typically uncompressed file - } - outFeatureParams = featureParams; - } - { - reader.getFeatureParams(featureParams); - if (cookedFormat != featureParams.getStr("cookedEncoding")) { - cookedFormatOK = false; - } - if (rawFormat != featureParams.getStr("encoding")) { - rawFormatOK = false; - } - if (featureParams != outFeatureParams) { - rawFormatOK = false; - } - if (!reader.allowRawFeatures()) { - rawFormatOK = false; // Reader transforms data - } - } - if (!cookedFormatOK) { - LOG(error, "Cannot perform fusion, cooked feature formats don't match"); - return false; - } - if (rawFormatOK) { - featureParams.clear(); - featureParams.set("cooked", false); - reader.setFeatureParams(featureParams); - reader.getFeatureParams(featureParams); - if (featureParams.isSet("cookedEncoding") || - rawFormat != featureParams.getStr("encoding")) { - rawFormatOK = false; - } - if (!rawFormatOK) { - LOG(error, "Cannot perform fusion, raw format setting failed"); - return false; - } - LOG(debug, "Using raw feature format for fusion of posting files"); - } - return true; -} - - -std::shared_ptr -Fusion::allocate_field_length_scanner(const SchemaUtil::IndexIterator &index) -{ - if (index.use_interleaved_features()) { - PosOccFieldsParams fieldsParams; - fieldsParams.setSchemaParams(index.getSchema(), index.getIndex()); - assert(fieldsParams.getNumFields() > 0); - const PosOccFieldParams &fieldParams = fieldsParams.getFieldParams()[0]; - if (fieldParams._hasElements) { - for (const auto &old_index : _oldIndexes) { - const Schema &old_schema = old_index.getSchema(); - if (index.hasOldFields(old_schema) && - !index.has_matching_use_interleaved_features(old_schema)) { - return std::make_shared(_docIdLimit); - } - } - } - } - return std::shared_ptr(); -} - -bool -Fusion::openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, - std::vector > & readers) -{ - auto field_length_scanner = allocate_field_length_scanner(index); - vespalib::string indexName = index.getName(); - for (const auto &oi : _oldIndexes) { - const Schema &oldSchema = oi.getSchema(); - if (!index.hasOldFields(oldSchema)) { - continue; // drop data - } - auto reader = FieldReader::allocFieldReader(index, oldSchema, field_length_scanner); - reader->setup(list[oi.getIndex()], oi.getDocIdMapping()); - if (!reader->open(oi.getPath() + "/" + indexName + "/", _tuneFileIndexing._read)) { - return false; - } - readers.push_back(std::move(reader)); - } - return true; -} - - -bool -Fusion::openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter &writer, const FieldLengthInfo &field_length_info) -{ - vespalib::string dir = _outDir + "/" + index.getName(); - - if (!writer.open(dir + "/", 64, 262144, _dynamicKPosIndexFormat, - index.use_interleaved_features(), index.getSchema(), - index.getIndex(), - field_length_info, - _tuneFileIndexing._write, _fileHeaderContext)) { - throw IllegalArgumentException(make_string("Could not open output posocc + dictionary in %s", dir.c_str())); - } - return true; -} - - -bool -Fusion::setupMergeHeap(const std::vector > & readers, - FieldWriter &writer, PostingPriorityQueue &heap) -{ - for (auto &reader : readers) { - if (!selectCookedOrRawFeatures(*reader, writer)) { - return false; - } - if (reader->isValid()) { - reader->read(); - } - if (reader->isValid()) { - heap.initialAdd(reader.get()); - } - } - return true; -} - - -bool -Fusion::mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token) -{ - std::vector> readers; - PostingPriorityQueue heap; - /* OUTPUT */ - FieldWriter fieldWriter(_docIdLimit, numWordIds); - vespalib::string indexName = index.getName(); - - if (!openInputFieldReaders(index, list, readers)) { - return false; - } - FieldLengthInfo field_length_info; - if (!readers.empty()) { - field_length_info = readers.back()->get_field_length_info(); - } - if (!openFieldWriter(index, fieldWriter, field_length_info)) { - return false; - } - if (!setupMergeHeap(readers, fieldWriter, heap)) { - return false; - } - - heap.merge(fieldWriter, 4, flush_token); - if (flush_token.stop_requested()) { - return false; - } - assert(heap.empty()); - - for (auto &reader : readers) { - if (!reader->close()) { - return false; - } - } - if (!fieldWriter.close()) { - throw IllegalArgumentException(make_string("Could not close output posocc + dictionary in %s/%s", - _outDir.c_str(), indexName.c_str())); - } - return true; -} - - -bool -Fusion::readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list) -{ - for (const auto & oi : _oldIndexes) { - std::vector oldIndexes; - const Schema &oldSchema = oi.getSchema(); - if (!SchemaUtil::getIndexIds(oldSchema, DataType::STRING, oldIndexes)) { - return false; - } - WordNumMapping &wordNumMapping = list[oi.getIndex()]; - if (oldIndexes.empty()) { - wordNumMapping.noMappingFile(); - continue; - } - if (index && !index->hasOldFields(oldSchema)) { - continue; // drop data - } - - // Open word mapping file - vespalib::string old2newname = createTmpPath(dir, oi.getIndex()) + "/old2new.dat"; - wordNumMapping.readMappingFile(old2newname, _tuneFileIndexing._read); - } - - return true; -} - - -void -Fusion::makeTmpDirs(const vespalib::string & dir) -{ - for (const auto & index : _oldIndexes) { - vespalib::mkdir(createTmpPath(dir, index.getIndex()), false); - } -} - -bool -Fusion::cleanTmpDirs(const vespalib::string & dir) -{ - uint32_t i = 0; - for (;;) { - vespalib::string tmpindexpath = createTmpPath(dir, i); - FastOS_StatInfo statInfo; - if (!FastOS_File::Stat(tmpindexpath.c_str(), &statInfo)) { - if (statInfo._error == FastOS_StatInfo::FileNotFound) { - break; - } - LOG(error, "Failed to stat tmpdir %s", tmpindexpath.c_str()); - return false; - } - i++; - } - while (i > 0) { - i--; - vespalib::string tmpindexpath = createTmpPath(dir, i); - search::DirectoryTraverse dt(tmpindexpath.c_str()); - if (!dt.RemoveTree()) { - LOG(error, "Failed to clean tmpdir %s", tmpindexpath.c_str()); - return false; - } - } - return true; -} - - bool Fusion::checkSchemaCompat() { diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index c1acf41a043..22dda4d6edf 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -2,88 +2,38 @@ #pragma once -#include "docidmapper.h" -#include "wordnummapper.h" +#include "fusion_output_index.h" -#include #include -namespace search { template class PostingPriorityQueue; } namespace search { class IFlushToken; +template class PostingPriorityQueue; class TuneFileIndexing; } -namespace search::common { class FileHeaderContext; } -namespace search::index { class FieldLengthInfo; } -namespace search::diskindex { - -class FieldLengthScanner; -class FieldReader; -class FieldWriter; -class DictionaryWordReader; - -class FusionInputIndex -{ -private : - vespalib::string _path; - uint32_t _index; - index::Schema _schema; - DocIdMapping _docIdMapping; - -public: - FusionInputIndex(const vespalib::string &path, uint32_t index, const SelectorArray & selector); - FusionInputIndex(FusionInputIndex &&) = default; - FusionInputIndex & operator = (FusionInputIndex &&) = default; - ~FusionInputIndex(); +namespace vespalib { template class Array; } - const vespalib::string & getPath() const { return _path; } - uint32_t getIndex() const { return _index; } - const DocIdMapping & getDocIdMapping() const { return _docIdMapping; } - const index::Schema &getSchema() const { return _schema; } -}; +namespace search::diskindex { +using SelectorArray = vespalib::Array; +/* + * Class that handles fusion of a set of disk indexes into a new disk + * index. + */ class Fusion { private: using Schema = index::Schema; - using SchemaUtil = index::SchemaUtil; - using WordNumMappingList = std::vector; - bool mergeFields(vespalib::ThreadExecutor & executor, std::shared_ptr flush_token); - bool mergeField(uint32_t id, std::shared_ptr flush_token); - std::shared_ptr allocate_field_length_scanner(const SchemaUtil::IndexIterator &index); - bool openInputFieldReaders(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, - std::vector > & readers); - bool openFieldWriter(const SchemaUtil::IndexIterator &index, FieldWriter & writer, const index::FieldLengthInfo &field_length_info); - bool setupMergeHeap(const std::vector > & readers, - FieldWriter &writer, PostingPriorityQueue &heap); - bool mergeFieldPostings(const SchemaUtil::IndexIterator &index, const WordNumMappingList & list, uint64_t numWordIds, const IFlushToken& flush_token); - bool openInputWordReaders(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, - std::vector > &readers, - PostingPriorityQueue &heap); - bool renumberFieldWordIds(const vespalib::string & dir, const SchemaUtil::IndexIterator &index, - WordNumMappingList & list, uint64_t& numWordIds, const IFlushToken& flush_token); - void makeTmpDirs(const vespalib::string & dir); - bool cleanTmpDirs(const vespalib::string & dir); + bool mergeFields(vespalib::ThreadExecutor& executor, std::shared_ptr flush_token); bool readSchemaFiles(); bool checkSchemaCompat(); - template - static bool selectCookedOrRawFeatures(Reader &reader, Writer &writer); - - bool readMappingFiles(const vespalib::string & dir, const SchemaUtil::IndexIterator *index, WordNumMappingList & list); - const Schema &getSchema() const { return _schema; } - - const Schema &_schema; // External ownership - std::vector _oldIndexes; - const uint32_t _docIdLimit; - const bool _dynamicKPosIndexFormat; - vespalib::string _outDir; + const Schema &getSchema() const { return _fusion_out_index.get_schema(); } - const TuneFileIndexing &_tuneFileIndexing; - const common::FileHeaderContext &_fileHeaderContext; + FusionOutputIndex _fusion_out_index; public: Fusion(const Fusion &) = delete; Fusion& operator=(const Fusion &) = delete; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp new file mode 100644 index 00000000000..278d095b639 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.cpp @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "fusion_input_index.h" +#include +#include +#include + +using search::index::SchemaUtil; +using vespalib::IllegalArgumentException; +using vespalib::make_string; + +namespace search::diskindex { + +FusionInputIndex::FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector) + : _path(path), + _index(index), + _schema() +{ + vespalib::string fname = path + "/schema.txt"; + if ( ! _schema.loadFromFile(fname)) { + throw IllegalArgumentException(make_string("Failed loading schema %s", fname.c_str())); + } + if ( ! SchemaUtil::validateSchema(_schema)) { + throw IllegalArgumentException(make_string("Failed validating schema %s", fname.c_str())); + } + if (!_docIdMapping.readDocIdLimit(path)) { + throw IllegalArgumentException(make_string("Cannot determine docIdLimit for old index \"%s\"", path.c_str())); + } + _docIdMapping.setup(_docIdMapping._docIdLimit, &selector, index); +} + +FusionInputIndex::~FusionInputIndex() = default; + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h new file mode 100644 index 00000000000..fd7bb8f0256 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_input_index.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "docidmapper.h" +#include +#include + +namespace search::diskindex { + +/* + * Class representing an index used as input for fusion. + */ +class FusionInputIndex +{ +private: + vespalib::string _path; + uint32_t _index; + index::Schema _schema; + DocIdMapping _docIdMapping; + +public: + FusionInputIndex(const vespalib::string& path, uint32_t index, const SelectorArray& selector); + FusionInputIndex(FusionInputIndex&&) = default; + FusionInputIndex & operator = (FusionInputIndex&&) = default; + ~FusionInputIndex(); + + const vespalib::string& getPath() const noexcept { return _path; } + uint32_t getIndex() const noexcept { return _index; } + const DocIdMapping& getDocIdMapping() const noexcept { return _docIdMapping; } + const index::Schema& getSchema() const noexcept { return _schema; } +}; + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp new file mode 100644 index 00000000000..66ec0889cbe --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.cpp @@ -0,0 +1,21 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "fusion_output_index.h" +#include "fusion_input_index.h" + +namespace search::diskindex { + +FusionOutputIndex::FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context) + : _schema(schema), + _path(path), + _old_indexes(std::move(old_indexes)), + _doc_id_limit(doc_id_limit), + _dynamic_k_pos_index_format(dynamic_k_pos_index_format), + _tune_file_indexing(tune_file_indexing), + _file_header_context(file_header_context) +{ +} + +FusionOutputIndex::~FusionOutputIndex() = default; + +} diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h new file mode 100644 index 00000000000..c366b111363 --- /dev/null +++ b/searchlib/src/vespa/searchlib/diskindex/fusion_output_index.h @@ -0,0 +1,43 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include +#include + +namespace search { class TuneFileIndexing; } +namespace search::common { class FileHeaderContext; } +namespace search::index { class Schema; } + +namespace search::diskindex { + +class FusionInputIndex; + +/* + * Class representing the portions of fusion output index state needed by + * FieldMerger. + */ +class FusionOutputIndex +{ +private: + const index::Schema& _schema; + const vespalib::string _path; + const std::vector _old_indexes; + const uint32_t _doc_id_limit; + const bool _dynamic_k_pos_index_format; + const TuneFileIndexing& _tune_file_indexing; + const common::FileHeaderContext& _file_header_context; +public: + FusionOutputIndex(const index::Schema& schema, const vespalib::string& path, std::vector old_indexes, uint32_t doc_id_limit, bool dynamic_k_pos_index_format, const TuneFileIndexing& tune_file_indexing, const common::FileHeaderContext& file_header_context); + ~FusionOutputIndex(); + + const index::Schema& get_schema() const noexcept { return _schema; } + const vespalib::string& get_path() const noexcept { return _path; } + const std::vector& get_old_indexes() const noexcept { return _old_indexes; } + uint32_t get_doc_id_limit() const noexcept { return _doc_id_limit; } + bool get_dynamic_k_pos_index_format() const noexcept { return _dynamic_k_pos_index_format; } + const TuneFileIndexing& get_tune_file_indexing() const noexcept { return _tune_file_indexing; } + const common::FileHeaderContext& get_file_header_context() const noexcept { return _file_header_context; } +}; + +} -- cgit v1.2.3