aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp129
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h9
2 files changed, 86 insertions, 52 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 57ab149404d..0f044f0dff8 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -63,7 +63,7 @@ AttributeWriter::WriteField::WriteField(AttributeVector &attribute)
AttributeWriter::WriteField::~WriteField() = default;
void
-AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType)
+AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType) const
{
const vespalib::string &name = _attribute.getName();
FieldPath fp;
@@ -80,6 +80,8 @@ AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType)
AttributeWriter::WriteContext::WriteContext(ExecutorId executorId) noexcept
: _executorId(executorId),
_fields(),
+ _data_type(nullptr),
+ _two_phase_put_field_path(),
_hasStructFieldAttribute(false),
_use_two_phase_put(false)
{
@@ -108,10 +110,18 @@ AttributeWriter::WriteContext::add(AttributeVector &attr)
}
void
-AttributeWriter::WriteContext::buildFieldPaths(const DocumentType &docType)
-{
- for (auto &field : _fields) {
- field.buildFieldPath(docType);
+AttributeWriter::WriteContext::consider_build_field_paths(const Document& doc) const
+{
+ auto data_type = doc.getDataType();
+ if (_data_type != data_type) {
+ _data_type = data_type;
+ auto& doc_type = doc.getType();
+ for (auto &field : _fields) {
+ field.buildFieldPath(doc_type);
+ }
+ if (_use_two_phase_put) {
+ _two_phase_put_field_path = std::make_shared<const FieldPath>(_fields[0].getFieldPath());
+ }
}
}
@@ -154,18 +164,43 @@ applyPutToAttribute(SerialNum serialNum, const FieldValue::UP &fieldValue, Docum
attr.commitIfChangeVectorTooLarge();
}
+class FieldValueAndPrepareResult {
+ std::unique_ptr<const FieldValue> _field_value;
+ std::unique_ptr<PrepareResult> _prepare_result;
+public:
+ FieldValueAndPrepareResult(std::unique_ptr<const FieldValue> field_value,
+ std::unique_ptr<PrepareResult> prepare_result)
+ : _field_value(std::move(field_value)),
+ _prepare_result(std::move(prepare_result))
+ {
+ }
+ FieldValueAndPrepareResult()
+ : _field_value(),
+ _prepare_result()
+ {
+ }
+ ~FieldValueAndPrepareResult();
+ FieldValueAndPrepareResult(FieldValueAndPrepareResult&&);
+ const std::unique_ptr<const FieldValue>& get_field_value() const noexcept { return _field_value; }
+ std::unique_ptr<PrepareResult> steal_prepare_result() { return std::move(_prepare_result); }
+};
+
+FieldValueAndPrepareResult::~FieldValueAndPrepareResult() = default;
+
+FieldValueAndPrepareResult::FieldValueAndPrepareResult(FieldValueAndPrepareResult&&) = default;
+
void
complete_put_to_attribute(SerialNum serial_num,
uint32_t docid,
AttributeVector& attr,
- const FieldValue::SP& field_value,
- std::future<std::unique_ptr<PrepareResult>>& result_future,
+ std::future<FieldValueAndPrepareResult> result_future,
AttributeWriter::OnWriteDoneType)
{
ensureLidSpace(serial_num, docid, attr);
- if (field_value.get()) {
- auto result = result_future.get();
- AttributeUpdater::complete_set_value(attr, docid, *field_value, std::move(result));
+ auto result = result_future.get();
+ auto& field_value = result.get_field_value();
+ if (field_value) {
+ AttributeUpdater::complete_set_value(attr, docid, *field_value, result.steal_prepare_result());
} else {
attr.clearDoc(docid);
}
@@ -303,28 +338,30 @@ class PutTask : public vespalib::Executor::Task
const uint32_t _lid;
const bool _allAttributes;
std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone;
- std::shared_ptr<DocumentFieldExtractor> _fieldExtractor;
+ const Document& _doc;
std::vector<FieldValue::UP> _fieldValues;
public:
- PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, std::shared_ptr<DocumentFieldExtractor> fieldExtractor, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone);
+ PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document& doc, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone);
~PutTask() override;
void run() override;
};
-PutTask::PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, std::shared_ptr<DocumentFieldExtractor> fieldExtractor, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone)
+PutTask::PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document& doc, uint32_t lid, bool allAttributes, AttributeWriter::OnWriteDoneType onWriteDone)
: _wc(wc),
_serialNum(serialNum),
_lid(lid),
_allAttributes(allAttributes),
_onWriteDone(onWriteDone),
- _fieldExtractor(std::move(fieldExtractor)),
+ _doc(doc),
_fieldValues()
{
+ _wc.consider_build_field_paths(_doc);
+ DocumentFieldExtractor field_extractor(_doc);
const auto &fields = _wc.getFields();
_fieldValues.reserve(fields.size());
- for (const auto &field : fields) {
+ for (const auto& field : fields) {
if (_allAttributes || field.isStructFieldAttribute()) {
- FieldValue::UP fv = _fieldExtractor->getFieldValue(field.getFieldPath());
+ FieldValue::UP fv = field_extractor.getFieldValue(field.getFieldPath());
_fieldValues.emplace_back(std::move(fv));
}
}
@@ -348,20 +385,21 @@ PutTask::run()
}
}
-
class PreparePutTask : public vespalib::Executor::Task {
private:
const SerialNum _serial_num;
const uint32_t _docid;
AttributeVector& _attr;
- FieldValue::SP _field_value;
- std::promise<std::unique_ptr<PrepareResult>> _result_promise;
+ std::shared_ptr<const FieldPath> _field_path;
+ const Document* const _doc;
+ std::unique_ptr<FieldValue> _field_value;
+ std::promise<FieldValueAndPrepareResult> _result_promise;
public:
PreparePutTask(SerialNum serial_num,
uint32_t docid,
- const AttributeWriter::WriteField& field,
- std::shared_ptr<DocumentFieldExtractor> field_extractor);
+ const AttributeWriter::WriteContext& wc,
+ const Document& doc);
PreparePutTask(SerialNum serial_num,
uint32_t docid,
AttributeVector& attr,
@@ -371,25 +409,27 @@ public:
SerialNum serial_num() const { return _serial_num; }
uint32_t docid() const { return _docid; }
AttributeVector& attr() { return _attr; }
- FieldValue::SP field_value() { return _field_value; }
- std::future<std::unique_ptr<PrepareResult>> result_future() {
+ std::future<FieldValueAndPrepareResult> result_future() {
return _result_promise.get_future();
}
};
PreparePutTask::PreparePutTask(SerialNum serial_num,
uint32_t docid,
- const AttributeWriter::WriteField& field,
- std::shared_ptr<DocumentFieldExtractor> field_extractor)
+ const AttributeWriter::WriteContext& wc,
+ const Document& doc)
: _serial_num(serial_num),
_docid(docid),
- _attr(field.getAttribute()),
+ _attr(wc.getFields()[0].getAttribute()),
+ _field_path(wc.get_two_phase_put_field_path()),
+ _doc(&doc),
_field_value(),
_result_promise()
{
- // Note: No need to store the field extractor as we are not extracting struct fields.
- auto value = field_extractor->getFieldValue(field.getFieldPath());
- _field_value.reset(value.release());
+ if (_field_path) {
+ DocumentFieldExtractor field_extractor(*_doc);
+ _field_value = field_extractor.getFieldValue(*_field_path);
+ }
}
PreparePutTask::PreparePutTask(SerialNum serial_num,
@@ -399,6 +439,8 @@ PreparePutTask::PreparePutTask(SerialNum serial_num,
: _serial_num(serial_num),
_docid(docid),
_attr(attr),
+ _field_path(),
+ _doc(nullptr),
_field_value(field_value.clone()),
_result_promise()
{
@@ -411,7 +453,10 @@ PreparePutTask::run()
{
if (_attr.getStatus().getLastSyncToken() < _serial_num) {
if (_field_value.get()) {
- _result_promise.set_value(AttributeUpdater::prepare_set_value(_attr, _docid, *_field_value));
+ auto& fv = *_field_value;
+ _result_promise.set_value(FieldValueAndPrepareResult(std::move(_field_value), AttributeUpdater::prepare_set_value(_attr, _docid, fv)));
+ } else {
+ _result_promise.set_value(FieldValueAndPrepareResult());
}
}
}
@@ -422,7 +467,7 @@ private:
const uint32_t _docid;
AttributeVector& _attr;
FieldValue::SP _field_value;
- std::future<std::unique_ptr<PrepareResult>> _result_future;
+ std::future<FieldValueAndPrepareResult> _result_future;
std::remove_reference_t<AttributeWriter::OnWriteDoneType> _on_write_done;
public:
@@ -437,7 +482,6 @@ CompletePutTask::CompletePutTask(PreparePutTask& prepare_task,
: _serial_num(prepare_task.serial_num()),
_docid(prepare_task.docid()),
_attr(prepare_task.attr()),
- _field_value(prepare_task.field_value()),
_result_future(prepare_task.result_future()),
_on_write_done(on_write_done)
{
@@ -449,7 +493,7 @@ void
CompletePutTask::run()
{
if (_attr.getStatus().getLastSyncToken() < _serial_num) {
- complete_put_to_attribute(_serial_num, _docid, _attr, _field_value, _result_future, _on_write_done);
+ complete_put_to_attribute(_serial_num, _docid, _attr, std::move(_result_future), _on_write_done);
}
}
@@ -587,33 +631,20 @@ AttributeWriter::setupWriteContexts()
}
void
-AttributeWriter::buildFieldPaths(const DocumentType & docType, const DataType *dataType)
-{
- for (auto &wc : _writeContexts) {
- wc.buildFieldPaths(docType);
- }
- _dataType = dataType;
-}
-
-void
AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid,
bool allAttributes, OnWriteDoneType onWriteDone)
{
- const DataType *dataType(doc.getDataType());
- if (_dataType != dataType) {
- buildFieldPaths(doc.getType(), dataType);
- }
- auto extractor = std::make_shared<DocumentFieldExtractor>(doc);
for (const auto &wc : _writeContexts) {
if (wc.use_two_phase_put()) {
assert(wc.getFields().size() == 1);
- auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor);
+ wc.consider_build_field_paths(doc);
+ auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc, doc);
auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone);
_shared_executor.execute(std::move(prepare_task));
_attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task));
} else {
if (allAttributes || wc.hasStructFieldAttribute()) {
- auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, allAttributes, onWriteDone);
+ auto putTask = std::make_unique<PutTask>(wc, serialNum, doc, lid, allAttributes, onWriteDone);
_attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index 7605b130689..726005ae04b 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -32,7 +32,7 @@ public:
* Represents an attribute vector for a field and details about how to write to it.
*/
class WriteField {
- FieldPath _fieldPath;
+ mutable FieldPath _fieldPath;
AttributeVector &_attribute;
bool _structFieldAttribute; // in array/map of struct
bool _use_two_phase_put;
@@ -41,7 +41,7 @@ public:
~WriteField();
AttributeVector &getAttribute() const { return _attribute; }
const FieldPath &getFieldPath() const { return _fieldPath; }
- void buildFieldPath(const DocumentType &docType);
+ void buildFieldPath(const DocumentType &docType) const;
bool isStructFieldAttribute() const { return _structFieldAttribute; }
bool use_two_phase_put() const { return _use_two_phase_put; }
};
@@ -52,6 +52,8 @@ public:
class WriteContext {
ExecutorId _executorId;
std::vector<WriteField> _fields;
+ mutable const DataType* _data_type;
+ mutable std::shared_ptr<const FieldPath> _two_phase_put_field_path;
bool _hasStructFieldAttribute;
// When this is true, the context only contains a single field.
bool _use_two_phase_put;
@@ -60,12 +62,13 @@ public:
WriteContext(WriteContext &&rhs) noexcept;
~WriteContext();
WriteContext &operator=(WriteContext &&rhs) noexcept;
- void buildFieldPaths(const DocumentType &docType);
+ void consider_build_field_paths(const Document& doc) const;
void add(AttributeVector &attr);
ExecutorId getExecutorId() const { return _executorId; }
const std::vector<WriteField> &getFields() const { return _fields; }
bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; }
bool use_two_phase_put() const { return _use_two_phase_put; }
+ std::shared_ptr<const FieldPath> get_two_phase_put_field_path() const noexcept { return _two_phase_put_field_path; }
};
struct AttributeWithInfo {