summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahoo-inc.com>2017-03-23 18:33:56 +0000
committerTor Egge <Tor.Egge@yahoo-inc.com>2017-03-23 18:33:56 +0000
commita4eb2a0311c15d9d8ed1ed8534737b9b9cea23e3 (patch)
tree9f64e18e7a656616612a14bc742e2984d384164d /searchcore
parentd16f535b772202432c8d805a5333ae6830286825 (diff)
Flush populated attributes to disk as part of reprocessing.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp4
-rw-r--r--searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp2
-rw-r--r--searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp7
14 files changed, 78 insertions, 18 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp b/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp
index e28dd24a454..19bfedfcd31 100644
--- a/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attribute_populator/attribute_populator_test.cpp
@@ -68,7 +68,7 @@ struct Fixture
_mgr(new AttributeManager(TEST_DIR, "test.subdb",
TuneFileAttributes(),
_fileHeader, _attributeFieldWriter, _hwInfo)),
- _pop(_mgr, 1, "test"),
+ _pop(_mgr, 1, "test", CREATE_SERIAL_NUM),
_ctx()
{
_mgr->addAttribute("a1", AVConfig(AVBasicType::INT32),
@@ -93,6 +93,8 @@ TEST_F("require that reprocess with document populates attribute", Fixture)
EXPECT_EQUAL(7u, attr->get()->getNumDocs());
EXPECT_EQUAL(44, attr->get()->getInt(6));
EXPECT_EQUAL(2u, attr->get()->getStatus().getLastSyncToken());
+ f._pop.done();
+ EXPECT_EQUAL(CREATE_SERIAL_NUM, attr->get()->getStatus().getLastSyncToken());
}
TEST_MAIN()
diff --git a/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp b/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp
index 36319ae77d5..89024e7cd97 100644
--- a/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp
+++ b/searchcore/src/tests/proton/reprocessing/attribute_reprocessing_initializer/attribute_reprocessing_initializer_test.cpp
@@ -121,7 +121,7 @@ struct Fixture
void init() {
_initializer.reset(new AttributeReprocessingInitializer
(ARIConfig(_newCfg._mgr, _newCfg._schema, _newCfg._inspector),
- ARIConfig(_oldCfg._mgr, _oldCfg._schema, _oldCfg._inspector), "test"));
+ ARIConfig(_oldCfg._mgr, _oldCfg._schema, _oldCfg._inspector), "test", INIT_SERIAL_NUM));
_initializer->initialize(_handler);
}
Fixture &addOldConfig(const StringVector &fields,
diff --git a/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp b/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp
index 483c1c3b977..1848ad8fc74 100644
--- a/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp
+++ b/searchcore/src/tests/proton/reprocessing/document_reprocessing_handler/document_reprocessing_handler_test.cpp
@@ -23,6 +23,7 @@ struct MyProcessor : public ReprocessingType
_lid = lid;
_docId = doc.getId();
}
+ virtual void done() { }
};
typedef MyProcessor<IReprocessingReader, const Document &> MyReader;
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp
index 226a79b6d49..dadf5d41912 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.cpp
@@ -15,6 +15,7 @@ namespace proton {
search::SerialNum
AttributePopulator::nextSerialNum()
{
+ assert(_currSerialNum <= _configSerialNum);
return _currSerialNum++;
}
@@ -32,10 +33,12 @@ AttributePopulator::getNames() const
AttributePopulator::AttributePopulator(const proton::IAttributeManager::SP &mgr,
search::SerialNum initSerialNum,
- const vespalib::string &subDbName)
+ const vespalib::string &subDbName,
+ search::SerialNum configSerialNum)
: _writer(mgr),
_initSerialNum(initSerialNum),
_currSerialNum(initSerialNum),
+ _configSerialNum(configSerialNum),
_subDbName(subDbName)
{
if (LOG_WOULD_LOG(event)) {
@@ -58,4 +61,18 @@ AttributePopulator::handleExisting(uint32_t lid, const document::Document &doc)
_writer.put(serialNum, doc, lid, true, std::shared_ptr<IDestructorCallback>());
}
+void
+AttributePopulator::done()
+{
+ auto mgr = _writer.getAttributeManager();
+ auto flushTargets = mgr->getFlushTargets();
+ for (const auto &flushTarget : flushTargets) {
+ assert(flushTarget->getFlushedSerialNum() < _configSerialNum);
+ auto task = flushTarget->initFlush(_configSerialNum);
+ assert(task);
+ task->run();
+ assert(flushTarget->getFlushedSerialNum() == _configSerialNum);
+ }
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h
index b6c53af6bac..e918499fb5b 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_populator.h
@@ -16,6 +16,7 @@ private:
AttributeWriter _writer;
search::SerialNum _initSerialNum;
search::SerialNum _currSerialNum;
+ search::SerialNum _configSerialNum;
vespalib::string _subDbName;
search::SerialNum nextSerialNum();
@@ -27,13 +28,15 @@ public:
AttributePopulator(const proton::IAttributeManager::SP &mgr,
search::SerialNum initSerialNum,
- const vespalib::string &subDbName);
+ const vespalib::string &subDbName,
+ search::SerialNum configSerialNum);
~AttributePopulator();
const IAttributeWriter &getWriter() const { return _writer; }
// Implements IReprocessingReader
- virtual void handleExisting(uint32_t lid, const document::Document &doc);
+ virtual void handleExisting(uint32_t lid, const document::Document &doc) override;
+ virtual void done() override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
index d6540331345..840709a7732 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
@@ -9,6 +9,12 @@ using search::AttributeGuard;
namespace proton {
+namespace {
+
+const vespalib::string FLUSH_TARGET_NAME_PREFIX("attribute.");
+
+}
+
bool
FilterAttributeManager::acceptAttribute(const vespalib::string &name) const
{
@@ -43,10 +49,24 @@ IAttributeManager::SP
FilterAttributeManager::create(const AttributeCollectionSpec &) const {
throw vespalib::IllegalArgumentException("Not implemented");
}
+
std::vector<searchcorespi::IFlushTarget::SP>
FilterAttributeManager::getFlushTargets() const {
- throw vespalib::IllegalArgumentException("Not implemented");
+ std::vector<searchcorespi::IFlushTarget::SP> completeList = _mgr->getFlushTargets();
+ std::vector<searchcorespi::IFlushTarget::SP> list;
+ list.reserve(completeList.size());
+ for (const auto &flushTarget : completeList) {
+ const vespalib::string &targetName = flushTarget->getName();
+ if (targetName.substr(0, FLUSH_TARGET_NAME_PREFIX.size()) == FLUSH_TARGET_NAME_PREFIX) {
+ vespalib::string name = targetName.substr(FLUSH_TARGET_NAME_PREFIX.size());
+ if (acceptAttribute(name)) {
+ list.push_back(flushTarget);
+ }
+ }
+ }
+ return list;
}
+
search::SerialNum
FilterAttributeManager::getOldestFlushedSerialNumber() const {
throw vespalib::IllegalArgumentException("Not implemented");
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp
index d1cb05292c1..64587f03883 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.cpp
@@ -36,7 +36,8 @@ bool fastPartialUpdateAttribute(const schema::DataType &attrType) {
FilterAttributeManager::AttributeSet
getAttributeSetToPopulate(const ARIConfig &newCfg,
- const ARIConfig &oldCfg)
+ const ARIConfig &oldCfg,
+ search::SerialNum serialNum)
{
FilterAttributeManager::AttributeSet attrsToPopulate;
std::vector<AttributeGuard> attrList;
@@ -45,7 +46,8 @@ getAttributeSetToPopulate(const ARIConfig &newCfg,
const vespalib::string &name = guard->getName();
bool inOldAttrMgr = oldCfg.getAttrMgr()->getAttribute(name)->valid();
bool inOldSchema = oldCfg.getInspector()->hasField(name);
- bool populateAttribute = !inOldAttrMgr && inOldSchema;
+ search::SerialNum flushedSerialNum = newCfg.getAttrMgr()->getFlushedSerialNum(name);
+ bool populateAttribute = !inOldAttrMgr && inOldSchema && (flushedSerialNum < serialNum);
LOG(debug, "getAttributeSetToPopulate(): name='%s', inOldAttrMgr=%s, inOldSchema=%s, populate=%s",
name.c_str(), toStr(inOldAttrMgr), toStr(inOldSchema), toStr(populateAttribute));
if (populateAttribute) {
@@ -58,15 +60,16 @@ getAttributeSetToPopulate(const ARIConfig &newCfg,
IReprocessingReader::SP
getAttributesToPopulate(const ARIConfig &newCfg,
const ARIConfig &oldCfg,
- const vespalib::string &subDbName)
+ const vespalib::string &subDbName,
+ search::SerialNum serialNum)
{
FilterAttributeManager::AttributeSet attrsToPopulate =
- getAttributeSetToPopulate(newCfg, oldCfg);
+ getAttributeSetToPopulate(newCfg, oldCfg, serialNum);
if (!attrsToPopulate.empty()) {
return IReprocessingReader::SP(new AttributePopulator
(IAttributeManager::SP(new FilterAttributeManager
(attrsToPopulate, newCfg.getAttrMgr())),
- ATTRIBUTE_INIT_SERIAL, subDbName));
+ ATTRIBUTE_INIT_SERIAL, subDbName, serialNum));
}
return IReprocessingReader::SP();
}
@@ -119,8 +122,9 @@ getFieldsToPopulate(const ARIConfig &newCfg,
AttributeReprocessingInitializer::
AttributeReprocessingInitializer(const Config &newCfg,
const Config &oldCfg,
- const vespalib::string &subDbName)
- : _attrsToPopulate(getAttributesToPopulate(newCfg, oldCfg, subDbName)),
+ const vespalib::string &subDbName,
+ search::SerialNum serialNum)
+ : _attrsToPopulate(getAttributesToPopulate(newCfg, oldCfg, subDbName, serialNum)),
_fieldsToPopulate(getFieldsToPopulate(newCfg, oldCfg, subDbName))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h
index f1837a6dbcb..bb2b4de4897 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h
@@ -46,7 +46,8 @@ private:
public:
AttributeReprocessingInitializer(const Config &newCfg,
const Config &oldCfg,
- const vespalib::string &subDbName);
+ const vespalib::string &subDbName,
+ search::SerialNum serialNum);
// Implements IReprocessingInitializer
virtual bool hasReprocessors() const;
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp
index 4da9bdf76d9..ff538ea8c68 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.cpp
@@ -45,4 +45,12 @@ DocumentReprocessingHandler::visit(uint32_t lid)
(void) lid;
}
+void
+DocumentReprocessingHandler::done()
+{
+ for (const auto &reader : _readers) {
+ reader->done();
+ }
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h
index 32e472beaa0..f950e3b61a9 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/document_reprocessing_handler.h
@@ -70,6 +70,7 @@ public:
virtual void visit(uint32_t lid);
+ void done();
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h b/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h
index e265befe257..f8c800f6e2e 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/i_reprocessing_reader.h
@@ -20,6 +20,7 @@ struct IReprocessingReader
* Handle the given existing document.
*/
virtual void handleExisting(uint32_t lid, const document::Document &doc) = 0;
+ virtual void done() = 0;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp
index 9084c3d8e63..59b3966454a 100644
--- a/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reprocessing/reprocess_documents_task.cpp
@@ -53,6 +53,7 @@ ReprocessDocumentsTask::run()
*this,
*_docTypeRepo);
}
+ _handler.done();
ts = fastos::ClockSystem::now();
int64_t elapsedTime = ts.ms() - _startTime;
EventLogger::reprocessDocumentsComplete(_subDbName,
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp
index 2698ffdda69..5f77400d7fd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.cpp
@@ -64,7 +64,7 @@ FastAccessDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig,
IDocumentTypeInspector::SP(new DocumentTypeInspector(*newDocType))),
ARIConfig(oldView->getAttributeWriter()->getAttributeManager(), *oldConfig.getSchemaSP(),
IDocumentTypeInspector::SP(new DocumentTypeInspector(*oldDocType))),
- _subDbName));
+ _subDbName, attrSpec.getCurrentSerialNum()));
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp
index 202efae5397..c6b3a86239c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_doc_subdb_configurer.cpp
@@ -164,7 +164,8 @@ createAttributeReprocessingInitializer(const DocumentDBConfig &newConfig,
const IAttributeManager::SP &newAttrMgr,
const DocumentDBConfig &oldConfig,
const IAttributeManager::SP &oldAttrMgr,
- const vespalib::string &subDbName)
+ const vespalib::string &subDbName,
+ search::SerialNum serialNum)
{
const document::DocumentType *newDocType = newConfig.getDocumentType();
const document::DocumentType *oldDocType = oldConfig.getDocumentType();
@@ -175,7 +176,7 @@ createAttributeReprocessingInitializer(const DocumentDBConfig &newConfig,
IDocumentTypeInspector::SP(new DocumentTypeInspector(*newDocType))),
ARIConfig(oldAttrMgr, *oldConfig.getSchemaSP(),
IDocumentTypeInspector::SP(new DocumentTypeInspector(*oldDocType))),
- subDbName));
+ subDbName, serialNum));
}
}
@@ -216,7 +217,7 @@ SearchableDocSubDBConfigurer::reconfigure(const DocumentDBConfig &newConfig,
attrWriter = newAttrWriter;
shouldFeedViewChange = true;
initializer.reset(createAttributeReprocessingInitializer(newConfig, newAttrMgr,
- oldConfig, oldAttrMgr, _subDbName).release());
+ oldConfig, oldAttrMgr, _subDbName, attrSpec.getCurrentSerialNum()).release());
}
ISummaryManager::ISummarySetup::SP sumSetup =