diff options
3 files changed, 128 insertions, 25 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp index 754cf99b00a..34bb88a19dd 100644 --- a/searchcore/src/tests/proton/attribute/attribute_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp @@ -803,12 +803,14 @@ public: mutable size_t prepare_set_tensor_cnt; mutable size_t complete_set_tensor_cnt; size_t clear_doc_cnt; + const Value* exp_tensor; MockDenseTensorAttribute(vespalib::stringref name, const AVConfig& cfg) : DenseTensorAttribute(name, cfg), prepare_set_tensor_cnt(0), complete_set_tensor_cnt(0), - clear_doc_cnt(0) + clear_doc_cnt(0), + exp_tensor() {} uint32_t clearDoc(DocId docid) override { ++clear_doc_cnt; @@ -816,6 +818,7 @@ public: } std::unique_ptr<PrepareResult> prepare_set_tensor(uint32_t docid, const Value& tensor) const override { ++prepare_set_tensor_cnt; + EXPECT_EQ(*exp_tensor, tensor); return std::make_unique<MockPrepareResult>(docid, tensor); } @@ -873,6 +876,7 @@ class TwoPhasePutTest : public AttributeWriterTest { public: Schema schema; DocBuilder builder; + vespalib::string doc_id; std::shared_ptr<MockDenseTensorAttribute> attr; std::unique_ptr<Value> tensor; @@ -880,6 +884,7 @@ public: : AttributeWriterTest(), schema(createTensorSchema(dense_tensor)), builder(schema), + doc_id("id:ns:searchdocument::1"), attr() { setup(2); @@ -889,6 +894,7 @@ public: attr->clear_doc_cnt = 0; tensor = make_tensor(TensorSpec(dense_tensor) .add({{"x", 0}}, 3).add({{"x", 1}}, 5)); + attr->exp_tensor = tensor.get(); } void expect_tensor_attr_calls(size_t exp_prepare_cnt, size_t exp_complete_cnt, @@ -901,13 +907,23 @@ public: return createTensorPutDoc(builder, *tensor); } Document::UP make_no_field_doc() { - return builder.startDocument("id:ns:searchdocument::1").endDocument(); + return builder.startDocument(doc_id).endDocument(); } Document::UP make_no_tensor_doc() { - return builder.startDocument("id:ns:searchdocument::1"). + return builder.startDocument(doc_id). startAttributeField("a1"). addTensor(std::unique_ptr<vespalib::eval::Value>()).endField().endDocument(); } + DocumentUpdate::UP make_assign_update() { + auto upd = std::make_unique<DocumentUpdate>(*builder.getDocumentTypeRepo(), + builder.getDocumentType(), + DocumentId(doc_id)); + TensorDataType tensor_type(vespalib::eval::ValueType::from_spec(dense_tensor)); + TensorFieldValue tensor_value(tensor_type); + tensor_value= SimpleValue::from_value(*tensor); + upd->addUpdate(FieldUpdate(upd->getType().getField("a1")).addUpdate(AssignValueUpdate(tensor_value))); + return upd; + } void expect_shared_executor_tasks(size_t exp_accepted_tasks) { auto stats = _shared.getStats(); EXPECT_EQ(exp_accepted_tasks, stats.acceptedTasks); @@ -958,6 +974,22 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set) assertExecuteHistory({0}); } +TEST_F(TwoPhasePutTest, handles_assign_update_as_two_phase_put_when_specified_for_tensor_attribute) +{ + auto upd = make_assign_update(); + + DummyFieldUpdateCallback on_update; + update(1, *upd, 1, on_update); + expect_tensor_attr_calls(1, 1); + expect_shared_executor_tasks(1); + assertExecuteHistory({0}); + + update(2, *upd, 2, on_update); + expect_tensor_attr_calls(2, 2); + expect_shared_executor_tasks(2); + assertExecuteHistory({0, 0}); +} + ImportedAttributeVector::SP createImportedAttribute(const vespalib::string &name) diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 218a37139bb..8334be9e29f 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -7,13 +7,14 @@ #include <vespa/document/base/exceptions.h> #include <vespa/document/datatype/documenttype.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/assignvalueupdate.h> +#include <vespa/searchcommon/attribute/attribute_utils.h> #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> #include <vespa/searchcore/proton/common/attribute_updater.h> #include <vespa/searchlib/attribute/imported_attribute_vector.h> -#include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/searchlib/tensor/prepare_result.h> -#include <vespa/searchcommon/attribute/attribute_utils.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/threadexecutor.h> #include <future> @@ -114,6 +115,21 @@ AttributeWriter::WriteContext::buildFieldPaths(const DocumentType &docType) } } +AttributeWriter::AttributeWithInfo::AttributeWithInfo() + : attribute(), + executor_id(), + use_two_phase_put_for_assign_updates(false) +{ +} + +AttributeWriter::AttributeWithInfo::AttributeWithInfo(search::AttributeVector* attribute_in, + ExecutorId executor_id_in) + : attribute(attribute_in), + executor_id(executor_id_in), + use_two_phase_put_for_assign_updates(use_two_phase_put_for_attribute(*attribute_in)) +{ +} + namespace { void @@ -340,10 +356,14 @@ private: std::promise<std::unique_ptr<PrepareResult>> _result_promise; public: - PreparePutTask(SerialNum serial_num_in, - uint32_t docid_in, + PreparePutTask(SerialNum serial_num, + uint32_t docid, const AttributeWriter::WriteField& field, std::shared_ptr<DocumentFieldExtractor> field_extractor); + PreparePutTask(SerialNum serial_num, + uint32_t docid, + AttributeVector& attr, + const FieldValue& field_value); ~PreparePutTask() override; void run() override; SerialNum serial_num() const { return _serial_num; } @@ -355,12 +375,12 @@ public: } }; -PreparePutTask::PreparePutTask(SerialNum serial_num_in, - uint32_t docid_in, +PreparePutTask::PreparePutTask(SerialNum serial_num, + uint32_t docid, const AttributeWriter::WriteField& field, std::shared_ptr<DocumentFieldExtractor> field_extractor) - : _serial_num(serial_num_in), - _docid(docid_in), + : _serial_num(serial_num), + _docid(docid), _attr(field.getAttribute()), _field_value(), _result_promise() @@ -370,6 +390,18 @@ PreparePutTask::PreparePutTask(SerialNum serial_num_in, _field_value.reset(value.release()); } +PreparePutTask::PreparePutTask(SerialNum serial_num, + uint32_t docid, + AttributeVector& attr, + const FieldValue& field_value) + : _serial_num(serial_num), + _docid(docid), + _attr(attr), + _field_value(field_value.clone()), + _result_promise() +{ +} + PreparePutTask::~PreparePutTask() = default; void @@ -610,7 +642,7 @@ AttributeWriter::AttributeWriter(proton::IAttributeManager::SP mgr) void AttributeWriter::setupAttriuteMapping() { for (auto attr : getWritableAttributes()) { vespalib::stringref name = attr->getName(); - _attrMap[name] = AttrWithId(attr, _attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix())); + _attrMap[name] = AttributeWithInfo(attr, _attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix())); } } @@ -664,6 +696,25 @@ AttributeWriter::remove(const LidVector &lidsToRemove, SerialNum serialNum, OnWr } } +namespace { + +bool +is_single_assign_update(const FieldUpdate& update) +{ + return (update.getUpdates().size() == 1) && + (update.getUpdates()[0]->getType() == ValueUpdate::Assign) && + (static_cast<const AssignValueUpdate &>(*update.getUpdates()[0]).hasValue()); +} + +const FieldValue& +get_single_assign_update_field_value(const FieldUpdate& update) +{ + const auto& assign = static_cast<const AssignValueUpdate &>(*update.getUpdates()[0]); + return assign.getValue(); +} + +} + void AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, DocumentIdT lid, OnWriteDoneType onWriteDone, IFieldUpdateCallback & onUpdate) @@ -679,8 +730,8 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document for (const auto &fupd : upd.getUpdates()) { LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().data()); - auto found = _attrMap.find(fupd.getField().getName()); - AttributeVector * attrp = (found != _attrMap.end()) ? found->second.first : nullptr; + auto itr = _attrMap.find(fupd.getField().getName()); + AttributeVector * attrp = (itr != _attrMap.end()) ? itr->second.attribute : nullptr; onUpdate.onUpdateField(fupd.getField().getName(), attrp); if (__builtin_expect(attrp == nullptr, false)) { LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().data()); @@ -688,10 +739,21 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document } // TODO: Check if we must use > due to multiple entries for same // document and attribute. - if (__builtin_expect(attrp->getStatus().getLastSyncToken() >= serialNum, false)) + if (__builtin_expect(attrp->getStatus().getLastSyncToken() >= serialNum, false)) { continue; - args[found->second.second.getId()]->_updates.emplace_back(attrp, &fupd); - LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str()); + } + if (itr->second.use_two_phase_put_for_assign_updates && + is_single_assign_update(fupd)) { + auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, *attrp, get_single_assign_update_field_value(fupd)); + auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone); + LOG(debug, "About to handle assign update as two phase put for docid %u in attribute vector '%s'", + lid, attrp->getName().c_str()); + _shared_executor.execute(std::move(prepare_task)); + _attributeFieldWriter.executeTask(itr->second.executor_id, std::move(complete_task)); + } else { + args[itr->second.executor_id.getId()]->_updates.emplace_back(attrp, &fupd); + LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str()); + } } // NOTE: The lifetime of the field update will be ensured by keeping the document update alive // in a operation done context object. @@ -708,8 +770,8 @@ void AttributeWriter::heartBeat(SerialNum serialNum) { for (auto entry : _attrMap) { - _attributeFieldWriter.execute(entry.second.second, - [serialNum, attr=entry.second.first]() + _attributeFieldWriter.execute(entry.second.executor_id, + [serialNum, attr=entry.second.attribute]() { applyHeartBeat(serialNum, *attr); }); } } @@ -737,8 +799,8 @@ void AttributeWriter::onReplayDone(uint32_t docIdLimit) { for (auto entry : _attrMap) { - _attributeFieldWriter.execute(entry.second.second, - [docIdLimit, attr = entry.second.first]() + _attributeFieldWriter.execute(entry.second.executor_id, + [docIdLimit, attr = entry.second.attribute]() { applyReplayDone(docIdLimit, *attr); }); } _attributeFieldWriter.sync(); @@ -749,8 +811,8 @@ void AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum) { for (auto entry : _attrMap) { - _attributeFieldWriter.execute(entry.second.second, - [wantedLidLimit, serialNum, attr=entry.second.first]() + _attributeFieldWriter.execute(entry.second.executor_id, + [wantedLidLimit, serialNum, attr=entry.second.attribute]() { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); }); } _attributeFieldWriter.sync(); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index 1c55f53aa5e..933de1a6c14 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -67,9 +67,18 @@ public: bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; } bool use_two_phase_put() const { return _use_two_phase_put; } }; + + struct AttributeWithInfo { + search::AttributeVector* attribute; + ExecutorId executor_id; + bool use_two_phase_put_for_assign_updates; + + AttributeWithInfo(); + AttributeWithInfo(search::AttributeVector* attribute_in, + ExecutorId executor_id_in); + }; private: - using AttrWithId = std::pair<search::AttributeVector *, ExecutorId>; - using AttrMap = vespalib::hash_map<vespalib::string, AttrWithId>; + using AttrMap = vespalib::hash_map<vespalib::string, AttributeWithInfo>; std::vector<WriteContext> _writeContexts; const DataType *_dataType; bool _hasStructFieldAttribute; |