diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-11-09 11:35:57 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-11-09 11:43:20 +0100 |
commit | 239ee06aa22a96c42d724a642c65bd07b9bfa36a (patch) | |
tree | dcf2699187e517ae2402221b347a09319b6fdf4b /searchcore | |
parent | f5e183850fd7f995bb5c481214c8c032edfc6de5 (diff) |
Extract field value in attribute writer thread.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp | 140 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h | 9 |
2 files changed, 86 insertions, 63 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 57ab149404d..587ec8dda1f 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -63,7 +63,7 @@ AttributeWriter::WriteField::WriteField(AttributeVector &attribute) AttributeWriter::WriteField::~WriteField() = default; void -AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType) +AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType) const { const vespalib::string &name = _attribute.getName(); FieldPath fp; @@ -80,6 +80,8 @@ AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType) AttributeWriter::WriteContext::WriteContext(ExecutorId executorId) noexcept : _executorId(executorId), _fields(), + _data_type(nullptr), + _two_phase_put_field_path(), _hasStructFieldAttribute(false), _use_two_phase_put(false) { @@ -108,10 +110,18 @@ AttributeWriter::WriteContext::add(AttributeVector &attr) } void -AttributeWriter::WriteContext::buildFieldPaths(const DocumentType &docType) -{ - for (auto &field : _fields) { - field.buildFieldPath(docType); +AttributeWriter::WriteContext::consider_build_field_paths(const Document& doc) const +{ + auto data_type = doc.getDataType(); + if (_data_type != data_type) { + _data_type = data_type; + auto& doc_type = doc.getType(); + for (auto &field : _fields) { + field.buildFieldPath(doc_type); + } + if (_use_two_phase_put) { + _two_phase_put_field_path = std::make_shared<const FieldPath>(_fields[0].getFieldPath()); + } } } @@ -154,18 +164,43 @@ applyPutToAttribute(SerialNum serialNum, const FieldValue::UP &fieldValue, Docum attr.commitIfChangeVectorTooLarge(); } +class FieldValueAndPrepareResult { + std::unique_ptr<const FieldValue> _field_value; + std::unique_ptr<PrepareResult> _prepare_result; +public: + FieldValueAndPrepareResult(std::unique_ptr<const FieldValue> field_value, + std::unique_ptr<PrepareResult> prepare_result) + : _field_value(std::move(field_value)), + _prepare_result(std::move(prepare_result)) + { + } + FieldValueAndPrepareResult() + : _field_value(), + _prepare_result() + { + } + ~FieldValueAndPrepareResult(); + FieldValueAndPrepareResult(FieldValueAndPrepareResult&&); + const std::unique_ptr<const FieldValue>& get_field_value() const noexcept { return _field_value; } + std::unique_ptr<PrepareResult> steal_prepare_result() { return std::move(_prepare_result); } +}; + +FieldValueAndPrepareResult::~FieldValueAndPrepareResult() = default; + +FieldValueAndPrepareResult::FieldValueAndPrepareResult(FieldValueAndPrepareResult&&) = default; + void complete_put_to_attribute(SerialNum serial_num, uint32_t docid, AttributeVector& attr, - const FieldValue::SP& field_value, - std::future<std::unique_ptr<PrepareResult>>& result_future, + std::future<FieldValueAndPrepareResult> result_future, AttributeWriter::OnWriteDoneType) { ensureLidSpace(serial_num, docid, attr); - if (field_value.get()) { - auto result = result_future.get(); - AttributeUpdater::complete_set_value(attr, docid, *field_value, std::move(result)); + auto result = result_future.get(); + auto& field_value = result.get_field_value(); + if (field_value) { + AttributeUpdater::complete_set_value(attr, docid, *field_value, result.steal_prepare_result()); } else { attr.clearDoc(docid); } @@ -303,31 +338,21 @@ class PutTask : public vespalib::Executor::Task const uint32_t _lid; const bool _allAttributes; std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone; - std::shared_ptr<DocumentFieldExtractor> _fieldExtractor; - std::vector<FieldValue::UP> _fieldValues; + const Document& _doc; public: - PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, std::shared_ptr<DocumentFieldExtractor> fieldExtractor, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone); + PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document& doc, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone); ~PutTask() override; void run() override; }; -PutTask::PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, std::shared_ptr<DocumentFieldExtractor> fieldExtractor, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone) +PutTask::PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document& doc, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone) : _wc(wc), _serialNum(serialNum), _lid(lid), _allAttributes(allAttributes), _onWriteDone(onWriteDone), - _fieldExtractor(std::move(fieldExtractor)), - _fieldValues() + _doc(doc) { - const auto &fields = _wc.getFields(); - _fieldValues.reserve(fields.size()); - for (const auto &field : fields) { - if (_allAttributes || field.isStructFieldAttribute()) { - FieldValue::UP fv = _fieldExtractor->getFieldValue(field.getFieldPath()); - _fieldValues.emplace_back(std::move(fv)); - } - } } PutTask::~PutTask() = default; @@ -335,33 +360,35 @@ PutTask::~PutTask() = default; void PutTask::run() { - uint32_t fieldId = 0; + _wc.consider_build_field_paths(_doc); + DocumentFieldExtractor field_extractor(_doc); const auto &fields = _wc.getFields(); for (auto field : fields) { if (_allAttributes || field.isStructFieldAttribute()) { AttributeVector &attr = field.getAttribute(); if (attr.getStatus().getLastSyncToken() < _serialNum) { - applyPutToAttribute(_serialNum, _fieldValues[fieldId], _lid, attr, _onWriteDone); + auto fv = field_extractor.getFieldValue(field.getFieldPath()); + applyPutToAttribute(_serialNum, fv, _lid, attr, _onWriteDone); } - ++fieldId; } } } - class PreparePutTask : public vespalib::Executor::Task { private: const SerialNum _serial_num; const uint32_t _docid; AttributeVector& _attr; - FieldValue::SP _field_value; - std::promise<std::unique_ptr<PrepareResult>> _result_promise; + std::shared_ptr<const FieldPath> _field_path; + const Document* const _doc; + std::unique_ptr<FieldValue> _field_value; + std::promise<FieldValueAndPrepareResult> _result_promise; public: PreparePutTask(SerialNum serial_num, uint32_t docid, - const AttributeWriter::WriteField& field, - std::shared_ptr<DocumentFieldExtractor> field_extractor); + const AttributeWriter::WriteContext& wc, + const Document& doc); PreparePutTask(SerialNum serial_num, uint32_t docid, AttributeVector& attr, @@ -371,25 +398,23 @@ public: SerialNum serial_num() const { return _serial_num; } uint32_t docid() const { return _docid; } AttributeVector& attr() { return _attr; } - FieldValue::SP field_value() { return _field_value; } - std::future<std::unique_ptr<PrepareResult>> result_future() { + std::future<FieldValueAndPrepareResult> result_future() { return _result_promise.get_future(); } }; PreparePutTask::PreparePutTask(SerialNum serial_num, uint32_t docid, - const AttributeWriter::WriteField& field, - std::shared_ptr<DocumentFieldExtractor> field_extractor) + const AttributeWriter::WriteContext& wc, + const Document& doc) : _serial_num(serial_num), _docid(docid), - _attr(field.getAttribute()), + _attr(wc.getFields()[0].getAttribute()), + _field_path(wc.get_two_phase_put_field_path()), + _doc(&doc), _field_value(), _result_promise() { - // Note: No need to store the field extractor as we are not extracting struct fields. - auto value = field_extractor->getFieldValue(field.getFieldPath()); - _field_value.reset(value.release()); } PreparePutTask::PreparePutTask(SerialNum serial_num, @@ -399,6 +424,8 @@ PreparePutTask::PreparePutTask(SerialNum serial_num, : _serial_num(serial_num), _docid(docid), _attr(attr), + _field_path(), + _doc(nullptr), _field_value(field_value.clone()), _result_promise() { @@ -410,8 +437,15 @@ void PreparePutTask::run() { if (_attr.getStatus().getLastSyncToken() < _serial_num) { + if (_field_path) { + DocumentFieldExtractor field_extractor(*_doc); + _field_value = field_extractor.getFieldValue(*_field_path); + } if (_field_value.get()) { - _result_promise.set_value(AttributeUpdater::prepare_set_value(_attr, _docid, *_field_value)); + auto& fv = *_field_value; + _result_promise.set_value(FieldValueAndPrepareResult(std::move(_field_value), AttributeUpdater::prepare_set_value(_attr, _docid, fv))); + } else { + _result_promise.set_value(FieldValueAndPrepareResult()); } } } @@ -422,7 +456,7 @@ private: const uint32_t _docid; AttributeVector& _attr; FieldValue::SP _field_value; - std::future<std::unique_ptr<PrepareResult>> _result_future; + std::future<FieldValueAndPrepareResult> _result_future; std::remove_reference_t<AttributeWriter::OnWriteDoneType> _on_write_done; public: @@ -437,7 +471,6 @@ CompletePutTask::CompletePutTask(PreparePutTask& prepare_task, : _serial_num(prepare_task.serial_num()), _docid(prepare_task.docid()), _attr(prepare_task.attr()), - _field_value(prepare_task.field_value()), _result_future(prepare_task.result_future()), _on_write_done(on_write_done) { @@ -449,7 +482,7 @@ void CompletePutTask::run() { if (_attr.getStatus().getLastSyncToken() < _serial_num) { - complete_put_to_attribute(_serial_num, _docid, _attr, _field_value, _result_future, _on_write_done); + complete_put_to_attribute(_serial_num, _docid, _attr, std::move(_result_future), _on_write_done); } } @@ -587,33 +620,20 @@ AttributeWriter::setupWriteContexts() } void -AttributeWriter::buildFieldPaths(const DocumentType & docType, const DataType *dataType) -{ - for (auto &wc : _writeContexts) { - wc.buildFieldPaths(docType); - } - _dataType = dataType; -} - -void AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid, bool allAttributes, OnWriteDoneType onWriteDone) { - const DataType *dataType(doc.getDataType()); - if (_dataType != dataType) { - buildFieldPaths(doc.getType(), dataType); - } - auto extractor = std::make_shared<DocumentFieldExtractor>(doc); for (const auto &wc : _writeContexts) { if (wc.use_two_phase_put()) { assert(wc.getFields().size() == 1); - auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor); + wc.consider_build_field_paths(doc); + auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc, doc); auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone); _shared_executor.execute(std::move(prepare_task)); _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task)); } else { if (allAttributes || wc.hasStructFieldAttribute()) { - auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, allAttributes, onWriteDone); + auto putTask = std::make_unique<PutTask>(wc, serialNum, doc, lid, allAttributes, onWriteDone); _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index 7605b130689..726005ae04b 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -32,7 +32,7 @@ public: * Represents an attribute vector for a field and details about how to write to it. */ class WriteField { - FieldPath _fieldPath; + mutable FieldPath _fieldPath; AttributeVector &_attribute; bool _structFieldAttribute; // in array/map of struct bool _use_two_phase_put; @@ -41,7 +41,7 @@ public: ~WriteField(); AttributeVector &getAttribute() const { return _attribute; } const FieldPath &getFieldPath() const { return _fieldPath; } - void buildFieldPath(const DocumentType &docType); + void buildFieldPath(const DocumentType &docType) const; bool isStructFieldAttribute() const { return _structFieldAttribute; } bool use_two_phase_put() const { return _use_two_phase_put; } }; @@ -52,6 +52,8 @@ public: class WriteContext { ExecutorId _executorId; std::vector<WriteField> _fields; + mutable const DataType* _data_type; + mutable std::shared_ptr<const FieldPath> _two_phase_put_field_path; bool _hasStructFieldAttribute; // When this is true, the context only contains a single field. bool _use_two_phase_put; @@ -60,12 +62,13 @@ public: WriteContext(WriteContext &&rhs) noexcept; ~WriteContext(); WriteContext &operator=(WriteContext &&rhs) noexcept; - void buildFieldPaths(const DocumentType &docType); + void consider_build_field_paths(const Document& doc) const; void add(AttributeVector &attr); ExecutorId getExecutorId() const { return _executorId; } const std::vector<WriteField> &getFields() const { return _fields; } bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; } bool use_two_phase_put() const { return _use_two_phase_put; } + std::shared_ptr<const FieldPath> get_two_phase_put_field_path() const noexcept { return _two_phase_put_field_path; } }; struct AttributeWithInfo { |