summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/attribute/attribute_test.cpp38
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp102
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h13
3 files changed, 128 insertions, 25 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp
index 754cf99b00a..34bb88a19dd 100644
--- a/searchcore/src/tests/proton/attribute/attribute_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp
@@ -803,12 +803,14 @@ public:
mutable size_t prepare_set_tensor_cnt;
mutable size_t complete_set_tensor_cnt;
size_t clear_doc_cnt;
+ const Value* exp_tensor;
MockDenseTensorAttribute(vespalib::stringref name, const AVConfig& cfg)
: DenseTensorAttribute(name, cfg),
prepare_set_tensor_cnt(0),
complete_set_tensor_cnt(0),
- clear_doc_cnt(0)
+ clear_doc_cnt(0),
+ exp_tensor()
{}
uint32_t clearDoc(DocId docid) override {
++clear_doc_cnt;
@@ -816,6 +818,7 @@ public:
}
std::unique_ptr<PrepareResult> prepare_set_tensor(uint32_t docid, const Value& tensor) const override {
++prepare_set_tensor_cnt;
+ EXPECT_EQ(*exp_tensor, tensor);
return std::make_unique<MockPrepareResult>(docid, tensor);
}
@@ -873,6 +876,7 @@ class TwoPhasePutTest : public AttributeWriterTest {
public:
Schema schema;
DocBuilder builder;
+ vespalib::string doc_id;
std::shared_ptr<MockDenseTensorAttribute> attr;
std::unique_ptr<Value> tensor;
@@ -880,6 +884,7 @@ public:
: AttributeWriterTest(),
schema(createTensorSchema(dense_tensor)),
builder(schema),
+ doc_id("id:ns:searchdocument::1"),
attr()
{
setup(2);
@@ -889,6 +894,7 @@ public:
attr->clear_doc_cnt = 0;
tensor = make_tensor(TensorSpec(dense_tensor)
.add({{"x", 0}}, 3).add({{"x", 1}}, 5));
+ attr->exp_tensor = tensor.get();
}
void expect_tensor_attr_calls(size_t exp_prepare_cnt,
size_t exp_complete_cnt,
@@ -901,13 +907,23 @@ public:
return createTensorPutDoc(builder, *tensor);
}
Document::UP make_no_field_doc() {
- return builder.startDocument("id:ns:searchdocument::1").endDocument();
+ return builder.startDocument(doc_id).endDocument();
}
Document::UP make_no_tensor_doc() {
- return builder.startDocument("id:ns:searchdocument::1").
+ return builder.startDocument(doc_id).
startAttributeField("a1").
addTensor(std::unique_ptr<vespalib::eval::Value>()).endField().endDocument();
}
+ DocumentUpdate::UP make_assign_update() {
+ auto upd = std::make_unique<DocumentUpdate>(*builder.getDocumentTypeRepo(),
+ builder.getDocumentType(),
+ DocumentId(doc_id));
+ TensorDataType tensor_type(vespalib::eval::ValueType::from_spec(dense_tensor));
+ TensorFieldValue tensor_value(tensor_type);
+ tensor_value= SimpleValue::from_value(*tensor);
+ upd->addUpdate(FieldUpdate(upd->getType().getField("a1")).addUpdate(AssignValueUpdate(tensor_value)));
+ return upd;
+ }
void expect_shared_executor_tasks(size_t exp_accepted_tasks) {
auto stats = _shared.getStats();
EXPECT_EQ(exp_accepted_tasks, stats.acceptedTasks);
@@ -958,6 +974,22 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set)
assertExecuteHistory({0});
}
+TEST_F(TwoPhasePutTest, handles_assign_update_as_two_phase_put_when_specified_for_tensor_attribute)
+{
+ auto upd = make_assign_update();
+
+ DummyFieldUpdateCallback on_update;
+ update(1, *upd, 1, on_update);
+ expect_tensor_attr_calls(1, 1);
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
+
+ update(2, *upd, 2, on_update);
+ expect_tensor_attr_calls(2, 2);
+ expect_shared_executor_tasks(2);
+ assertExecuteHistory({0, 0});
+}
+
ImportedAttributeVector::SP
createImportedAttribute(const vespalib::string &name)
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 218a37139bb..8334be9e29f 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -7,13 +7,14 @@
#include <vespa/document/base/exceptions.h>
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/assignvalueupdate.h>
+#include <vespa/searchcommon/attribute/attribute_utils.h>
#include <vespa/searchcore/proton/attribute/imported_attributes_repo.h>
#include <vespa/searchcore/proton/common/attribute_updater.h>
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
-#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/searchlib/tensor/prepare_result.h>
-#include <vespa/searchcommon/attribute/attribute_utils.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
@@ -114,6 +115,21 @@ AttributeWriter::WriteContext::buildFieldPaths(const DocumentType &docType)
}
}
+AttributeWriter::AttributeWithInfo::AttributeWithInfo()
+ : attribute(),
+ executor_id(),
+ use_two_phase_put_for_assign_updates(false)
+{
+}
+
+AttributeWriter::AttributeWithInfo::AttributeWithInfo(search::AttributeVector* attribute_in,
+ ExecutorId executor_id_in)
+ : attribute(attribute_in),
+ executor_id(executor_id_in),
+ use_two_phase_put_for_assign_updates(use_two_phase_put_for_attribute(*attribute_in))
+{
+}
+
namespace {
void
@@ -340,10 +356,14 @@ private:
std::promise<std::unique_ptr<PrepareResult>> _result_promise;
public:
- PreparePutTask(SerialNum serial_num_in,
- uint32_t docid_in,
+ PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
const AttributeWriter::WriteField& field,
std::shared_ptr<DocumentFieldExtractor> field_extractor);
+ PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
+ AttributeVector& attr,
+ const FieldValue& field_value);
~PreparePutTask() override;
void run() override;
SerialNum serial_num() const { return _serial_num; }
@@ -355,12 +375,12 @@ public:
}
};
-PreparePutTask::PreparePutTask(SerialNum serial_num_in,
- uint32_t docid_in,
+PreparePutTask::PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
const AttributeWriter::WriteField& field,
std::shared_ptr<DocumentFieldExtractor> field_extractor)
- : _serial_num(serial_num_in),
- _docid(docid_in),
+ : _serial_num(serial_num),
+ _docid(docid),
_attr(field.getAttribute()),
_field_value(),
_result_promise()
@@ -370,6 +390,18 @@ PreparePutTask::PreparePutTask(SerialNum serial_num_in,
_field_value.reset(value.release());
}
+PreparePutTask::PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
+ AttributeVector& attr,
+ const FieldValue& field_value)
+ : _serial_num(serial_num),
+ _docid(docid),
+ _attr(attr),
+ _field_value(field_value.clone()),
+ _result_promise()
+{
+}
+
PreparePutTask::~PreparePutTask() = default;
void
@@ -610,7 +642,7 @@ AttributeWriter::AttributeWriter(proton::IAttributeManager::SP mgr)
void AttributeWriter::setupAttriuteMapping() {
for (auto attr : getWritableAttributes()) {
vespalib::stringref name = attr->getName();
- _attrMap[name] = AttrWithId(attr, _attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix()));
+ _attrMap[name] = AttributeWithInfo(attr, _attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix()));
}
}
@@ -664,6 +696,25 @@ AttributeWriter::remove(const LidVector &lidsToRemove, SerialNum serialNum, OnWr
}
}
+namespace {
+
+bool
+is_single_assign_update(const FieldUpdate& update)
+{
+ return (update.getUpdates().size() == 1) &&
+ (update.getUpdates()[0]->getType() == ValueUpdate::Assign) &&
+ (static_cast<const AssignValueUpdate &>(*update.getUpdates()[0]).hasValue());
+}
+
+const FieldValue&
+get_single_assign_update_field_value(const FieldUpdate& update)
+{
+ const auto& assign = static_cast<const AssignValueUpdate &>(*update.getUpdates()[0]);
+ return assign.getValue();
+}
+
+}
+
void
AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, DocumentIdT lid,
OnWriteDoneType onWriteDone, IFieldUpdateCallback & onUpdate)
@@ -679,8 +730,8 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
for (const auto &fupd : upd.getUpdates()) {
LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().data());
- auto found = _attrMap.find(fupd.getField().getName());
- AttributeVector * attrp = (found != _attrMap.end()) ? found->second.first : nullptr;
+ auto itr = _attrMap.find(fupd.getField().getName());
+ AttributeVector * attrp = (itr != _attrMap.end()) ? itr->second.attribute : nullptr;
onUpdate.onUpdateField(fupd.getField().getName(), attrp);
if (__builtin_expect(attrp == nullptr, false)) {
LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().data());
@@ -688,10 +739,21 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
}
// TODO: Check if we must use > due to multiple entries for same
// document and attribute.
- if (__builtin_expect(attrp->getStatus().getLastSyncToken() >= serialNum, false))
+ if (__builtin_expect(attrp->getStatus().getLastSyncToken() >= serialNum, false)) {
continue;
- args[found->second.second.getId()]->_updates.emplace_back(attrp, &fupd);
- LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str());
+ }
+ if (itr->second.use_two_phase_put_for_assign_updates &&
+ is_single_assign_update(fupd)) {
+ auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, *attrp, get_single_assign_update_field_value(fupd));
+ auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone);
+ LOG(debug, "About to handle assign update as two phase put for docid %u in attribute vector '%s'",
+ lid, attrp->getName().c_str());
+ _shared_executor.execute(std::move(prepare_task));
+ _attributeFieldWriter.executeTask(itr->second.executor_id, std::move(complete_task));
+ } else {
+ args[itr->second.executor_id.getId()]->_updates.emplace_back(attrp, &fupd);
+ LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str());
+ }
}
// NOTE: The lifetime of the field update will be ensured by keeping the document update alive
// in a operation done context object.
@@ -708,8 +770,8 @@ void
AttributeWriter::heartBeat(SerialNum serialNum)
{
for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.second,
- [serialNum, attr=entry.second.first]()
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [serialNum, attr=entry.second.attribute]()
{ applyHeartBeat(serialNum, *attr); });
}
}
@@ -737,8 +799,8 @@ void
AttributeWriter::onReplayDone(uint32_t docIdLimit)
{
for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.second,
- [docIdLimit, attr = entry.second.first]()
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [docIdLimit, attr = entry.second.attribute]()
{ applyReplayDone(docIdLimit, *attr); });
}
_attributeFieldWriter.sync();
@@ -749,8 +811,8 @@ void
AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum)
{
for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.second,
- [wantedLidLimit, serialNum, attr=entry.second.first]()
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [wantedLidLimit, serialNum, attr=entry.second.attribute]()
{ applyCompactLidSpace(wantedLidLimit, serialNum, *attr); });
}
_attributeFieldWriter.sync();
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index 1c55f53aa5e..933de1a6c14 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -67,9 +67,18 @@ public:
bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; }
bool use_two_phase_put() const { return _use_two_phase_put; }
};
+
+ struct AttributeWithInfo {
+ search::AttributeVector* attribute;
+ ExecutorId executor_id;
+ bool use_two_phase_put_for_assign_updates;
+
+ AttributeWithInfo();
+ AttributeWithInfo(search::AttributeVector* attribute_in,
+ ExecutorId executor_id_in);
+ };
private:
- using AttrWithId = std::pair<search::AttributeVector *, ExecutorId>;
- using AttrMap = vespalib::hash_map<vespalib::string, AttrWithId>;
+ using AttrMap = vespalib::hash_map<vespalib::string, AttributeWithInfo>;
std::vector<WriteContext> _writeContexts;
const DataType *_dataType;
bool _hasStructFieldAttribute;