diff options
author | Tor Egge <Tor.Egge@yahoo-inc.com> | 2017-03-23 18:33:56 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@yahoo-inc.com> | 2017-03-23 18:33:56 +0000 |
commit | a4eb2a0311c15d9d8ed1ed8534737b9b9cea23e3 (patch) | |
tree | 9f64e18e7a656616612a14bc742e2984d384164d /searchcore/src | |
parent | d16f535b772202432c8d805a5333ae6830286825 (diff) |
Flush populated attributes to disk as part of reprocessing.
Diffstat (limited to 'searchcore/src')
14 files changed, 78 insertions, 18 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp b/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp index e28dd24a454..19bfedfcd31 100644 --- a/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp @@ -68,7 +68,7 @@ struct Fixture _mgr(new AttributeManager(TEST_DIR, "test.subdb", TuneFileAttributes(), _fileHeader, _attributeFieldWriter, _hwInfo)), - _pop(_mgr, 1, "test"), + _pop(_mgr, 1, "test", CREATE_SERIAL_NUM), _ctx() { _mgr->addAttribute("a1", AVConfig(AVBasicType::INT32), @@ -93,6 +93,8 @@ TEST_F("require that reprocess with document populates attribute", Fixture) EXPECT_EQUAL(7u, attr->get()->getNumDocs()); EXPECT_EQUAL(44, attr->get()->getInt(6)); EXPECT_EQUAL(2u, attr->get()->getStatus().getLastSyncToken()); + f._pop.done(); + EXPECT_EQUAL(CREATE_SERIAL_NUM, attr->get()->getStatus().getLastSyncToken()); } TEST_MAIN() diff --git a/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp b/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp index 36319ae77d5..89024e7cd97 100644 --- a/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp +++ b/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp @@ -121,7 +121,7 @@ struct Fixture void init() { _initializer.reset(new AttributeReprocessingInitializer (ARIConfig(_newCfg._mgr, _newCfg._schema, _newCfg._inspector), - ARIConfig(_oldCfg._mgr, _oldCfg._schema, _oldCfg._inspector), "test")); + ARIConfig(_oldCfg._mgr, _oldCfg._schema, _oldCfg._inspector), "test", INIT_SERIAL_NUM)); _initializer->initialize(_handler); } Fixture &addOldConfig(const StringVector &fields, diff --git a/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp b/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp index 483c1c3b977..1848ad8fc74 100644 --- a/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp +++ b/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp @@ -23,6 +23,7 @@ struct MyProcessor : public ReprocessingType _lid = lid; _docId = doc.getId(); } + virtual void done() { } }; typedef MyProcessor<IReprocessingReader, const Document &> MyReader; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp index 226a79b6d49..dadf5d41912 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp @@ -15,6 +15,7 @@ namespace proton { search::SerialNum AttributePopulator::nextSerialNum() { + assert(_currSerialNum <= _configSerialNum); return _currSerialNum++; } @@ -32,10 +33,12 @@ AttributePopulator::getNames() const AttributePopulator::AttributePopulator(const proton::IAttributeManager::SP &mgr, search::SerialNum initSerialNum, - const vespalib::string &subDbName) + const vespalib::string &subDbName, + search::SerialNum configSerialNum) : _writer(mgr), _initSerialNum(initSerialNum), _currSerialNum(initSerialNum), + _configSerialNum(configSerialNum), _subDbName(subDbName) { if (LOG_WOULD_LOG(event)) { @@ -58,4 +61,18 @@ AttributePopulator::handleExisting(uint32_t lid, const document::Document &doc) _writer.put(serialNum, doc, lid, true, std::shared_ptr<IDestructorCallback>()); } +void +AttributePopulator::done() +{ + auto mgr = _writer.getAttributeManager(); + auto flushTargets = mgr->getFlushTargets(); + for (const auto &flushTarget : flushTargets) { + assert(flushTarget->getFlushedSerialNum() < _configSerialNum); + auto task = flushTarget->initFlush(_configSerialNum); + assert(task); + task->run(); + assert(flushTarget->getFlushedSerialNum() == _configSerialNum); + } +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h index b6c53af6bac..e918499fb5b 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h @@ -16,6 +16,7 @@ private: AttributeWriter _writer; search::SerialNum _initSerialNum; search::SerialNum _currSerialNum; + search::SerialNum _configSerialNum; vespalib::string _subDbName; search::SerialNum nextSerialNum(); @@ -27,13 +28,15 @@ public: AttributePopulator(const proton::IAttributeManager::SP &mgr, search::SerialNum initSerialNum, - const vespalib::string &subDbName); + const vespalib::string &subDbName, + search::SerialNum configSerialNum); ~AttributePopulator(); const IAttributeWriter &getWriter() const { return _writer; } // Implements IReprocessingReader - virtual void handleExisting(uint32_t lid, const document::Document &doc); + virtual void handleExisting(uint32_t lid, const document::Document &doc) override; + virtual void done() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp index d6540331345..840709a7732 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp @@ -9,6 +9,12 @@ using search::AttributeGuard; namespace proton { +namespace { + +const vespalib::string FLUSH_TARGET_NAME_PREFIX("attribute."); + +} + bool FilterAttributeManager::acceptAttribute(const vespalib::string &name) const { @@ -43,10 +49,24 @@ IAttributeManager::SP FilterAttributeManager::create(const AttributeCollectionSpec &) const { throw vespalib::IllegalArgumentException("Not implemented"); } + std::vector<searchcorespi::IFlushTarget::SP> FilterAttributeManager::getFlushTargets() const { - throw vespalib::IllegalArgumentException("Not implemented"); + std::vector<searchcorespi::IFlushTarget::SP> completeList = _mgr->getFlushTargets(); + std::vector<searchcorespi::IFlushTarget::SP> list; + list.reserve(completeList.size()); + for (const auto &flushTarget : completeList) { + const vespalib::string &targetName = flushTarget->getName(); + if (targetName.substr(0, FLUSH_TARGET_NAME_PREFIX.size()) == FLUSH_TARGET_NAME_PREFIX) { + vespalib::string name = targetName.substr(FLUSH_TARGET_NAME_PREFIX.size()); + if (acceptAttribute(name)) { + list.push_back(flushTarget); + } + } + } + return list; } + search::SerialNum FilterAttributeManager::getOldestFlushedSerialNumber() const { throw vespalib::IllegalArgumentException("Not implemented"); diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp index d1cb05292c1..64587f03883 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp @@ -36,7 +36,8 @@ bool fastPartialUpdateAttribute(const schema::DataType &attrType) { FilterAttributeManager::AttributeSet getAttributeSetToPopulate(const ARIConfig &newCfg, - const ARIConfig &oldCfg) + const ARIConfig &oldCfg, + search::SerialNum serialNum) { FilterAttributeManager::AttributeSet attrsToPopulate; std::vector<AttributeGuard> attrList; @@ -45,7 +46,8 @@ getAttributeSetToPopulate(const ARIConfig &newCfg, const vespalib::string &name = guard->getName(); bool inOldAttrMgr = oldCfg.getAttrMgr()->getAttribute(name)->valid(); bool inOldSchema = oldCfg.getInspector()->hasField(name); - bool populateAttribute = !inOldAttrMgr && inOldSchema; + search::SerialNum flushedSerialNum = newCfg.getAttrMgr()->getFlushedSerialNum(name); + bool populateAttribute = !inOldAttrMgr && inOldSchema && (flushedSerialNum < serialNum); LOG(debug, "getAttributeSetToPopulate(): name='%s', inOldAttrMgr=%s, inOldSchema=%s, populate=%s", name.c_str(), toStr(inOldAttrMgr), toStr(inOldSchema), toStr(populateAttribute)); if (populateAttribute) { @@ -58,15 +60,16 @@ getAttributeSetToPopulate(const ARIConfig &newCfg, IReprocessingReader::SP getAttributesToPopulate(const ARIConfig &newCfg, const ARIConfig &oldCfg, - const vespalib::string &subDbName) + const vespalib::string &subDbName, + search::SerialNum serialNum) { FilterAttributeManager::AttributeSet attrsToPopulate = - getAttributeSetToPopulate(newCfg, oldCfg); + getAttributeSetToPopulate(newCfg, oldCfg, serialNum); if (!attrsToPopulate.empty()) { return IReprocessingReader::SP(new AttributePopulator (IAttributeManager::SP(new FilterAttributeManager (attrsToPopulate, newCfg.getAttrMgr())), - ATTRIBUTE_INIT_SERIAL, subDbName)); + ATTRIBUTE_INIT_SERIAL, subDbName, serialNum)); } return IReprocessingReader::SP(); } @@ -119,8 +122,9 @@ getFieldsToPopulate(const ARIConfig &newCfg, AttributeReprocessingInitializer:: AttributeReprocessingInitializer(const Config &newCfg, const Config &oldCfg, - const vespalib::string &subDbName) - : _attrsToPopulate(getAttributesToPopulate(newCfg, oldCfg, subDbName)), + const vespalib::string &subDbName, + search::SerialNum serialNum) + : _attrsToPopulate(getAttributesToPopulate(newCfg, oldCfg, subDbName, serialNum)), _fieldsToPopulate(getFieldsToPopulate(newCfg, oldCfg, subDbName)) { } diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h index f1837a6dbcb..bb2b4de4897 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h @@ -46,7 +46,8 @@ private: public: AttributeReprocessingInitializer(const Config &newCfg, const Config &oldCfg, - const vespalib::string &subDbName); + const vespalib::string &subDbName, + search::SerialNum serialNum); // Implements IReprocessingInitializer virtual bool hasReprocessors() const; diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp index 4da9bdf76d9..ff538ea8c68 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp @@ -45,4 +45,12 @@ DocumentReprocessingHandler::visit(uint32_t lid) (void) lid; } +void +DocumentReprocessingHandler::done() +{ + for (const auto &reader : _readers) { + reader->done(); + } +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h index 32e472beaa0..f950e3b61a9 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h @@ -70,6 +70,7 @@ public: virtual void visit(uint32_t lid); + void done(); }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h b/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h index e265befe257..f8c800f6e2e 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h @@ -20,6 +20,7 @@ struct IReprocessingReader * Handle the given existing document. */ virtual void handleExisting(uint32_t lid, const document::Document &doc) = 0; + virtual void done() = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp index 9084c3d8e63..59b3966454a 100644 --- a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp +++ b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp @@ -53,6 +53,7 @@ ReprocessDocumentsTask::run() *this, *_docTypeRepo); } + _handler.done(); ts = fastos::ClockSystem::now(); int64_t elapsedTime = ts.ms() - _startTime; EventLogger::reprocessDocumentsComplete(_subDbName, diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp index 2698ffdda69..5f77400d7fd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp @@ -64,7 +64,7 @@ FastAccessDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig, IDocumentTypeInspector::SP(new DocumentTypeInspector(*newDocType))), ARIConfig(oldView->getAttributeWriter()->getAttributeManager(), *oldConfig.getSchemaSP(), IDocumentTypeInspector::SP(new DocumentTypeInspector(*oldDocType))), - _subDbName)); + _subDbName, attrSpec.getCurrentSerialNum())); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp index 202efae5397..c6b3a86239c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp @@ -164,7 +164,8 @@ createAttributeReprocessingInitializer(const DocumentDBConfig &newConfig, const IAttributeManager::SP &newAttrMgr, const DocumentDBConfig &oldConfig, const IAttributeManager::SP &oldAttrMgr, - const vespalib::string &subDbName) + const vespalib::string &subDbName, + search::SerialNum serialNum) { const document::DocumentType *newDocType = newConfig.getDocumentType(); const document::DocumentType *oldDocType = oldConfig.getDocumentType(); @@ -175,7 +176,7 @@ createAttributeReprocessingInitializer(const DocumentDBConfig &newConfig, IDocumentTypeInspector::SP(new DocumentTypeInspector(*newDocType))), ARIConfig(oldAttrMgr, *oldConfig.getSchemaSP(), IDocumentTypeInspector::SP(new DocumentTypeInspector(*oldDocType))), - subDbName)); + subDbName, serialNum)); } } @@ -216,7 +217,7 @@ SearchableDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig, attrWriter = newAttrWriter; shouldFeedViewChange = true; initializer.reset(createAttributeReprocessingInitializer(newConfig, newAttrMgr, - oldConfig, oldAttrMgr, _subDbName).release()); + oldConfig, oldAttrMgr, _subDbName, attrSpec.getCurrentSerialNum()).release()); } ISummaryManager::ISummarySetup::SP sumSetup = |