summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-07-14 14:11:04 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-07-14 14:11:04 +0000
commit1396ee4349c915148319e92b565e7b4397f6080a (patch)
treeec2c0353ae5cdf74253ccf6a1df1586e0a9c505e /searchcore
parent02b1f56dc2b76f9b0bb59fd0a84c4353f8cba089 (diff)
Optimize assign updates to tensor attribute with hnsw index by handling them as two phase put operations.
This ensures that the costly part of inserting into the hnsw index is done in parallel in the prepare put step.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/attribute/attribute_test.cpp38
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp102
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h13
3 files changed, 128 insertions, 25 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp
index 754cf99b00a..34bb88a19dd 100644
--- a/searchcore/src/tests/proton/attribute/attribute_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp
@@ -803,12 +803,14 @@ public:
mutable size_t prepare_set_tensor_cnt;
mutable size_t complete_set_tensor_cnt;
size_t clear_doc_cnt;
+ const Value* exp_tensor;
MockDenseTensorAttribute(vespalib::stringref name, const AVConfig& cfg)
: DenseTensorAttribute(name, cfg),
prepare_set_tensor_cnt(0),
complete_set_tensor_cnt(0),
- clear_doc_cnt(0)
+ clear_doc_cnt(0),
+ exp_tensor()
{}
uint32_t clearDoc(DocId docid) override {
++clear_doc_cnt;
@@ -816,6 +818,7 @@ public:
}
std::unique_ptr<PrepareResult> prepare_set_tensor(uint32_t docid, const Value& tensor) const override {
++prepare_set_tensor_cnt;
+ EXPECT_EQ(*exp_tensor, tensor);
return std::make_unique<MockPrepareResult>(docid, tensor);
}
@@ -873,6 +876,7 @@ class TwoPhasePutTest : public AttributeWriterTest {
public:
Schema schema;
DocBuilder builder;
+ vespalib::string doc_id;
std::shared_ptr<MockDenseTensorAttribute> attr;
std::unique_ptr<Value> tensor;
@@ -880,6 +884,7 @@ public:
: AttributeWriterTest(),
schema(createTensorSchema(dense_tensor)),
builder(schema),
+ doc_id("id:ns:searchdocument::1"),
attr()
{
setup(2);
@@ -889,6 +894,7 @@ public:
attr->clear_doc_cnt = 0;
tensor = make_tensor(TensorSpec(dense_tensor)
.add({{"x", 0}}, 3).add({{"x", 1}}, 5));
+ attr->exp_tensor = tensor.get();
}
void expect_tensor_attr_calls(size_t exp_prepare_cnt,
size_t exp_complete_cnt,
@@ -901,13 +907,23 @@ public:
return createTensorPutDoc(builder, *tensor);
}
Document::UP make_no_field_doc() {
- return builder.startDocument("id:ns:searchdocument::1").endDocument();
+ return builder.startDocument(doc_id).endDocument();
}
Document::UP make_no_tensor_doc() {
- return builder.startDocument("id:ns:searchdocument::1").
+ return builder.startDocument(doc_id).
startAttributeField("a1").
addTensor(std::unique_ptr<vespalib::eval::Value>()).endField().endDocument();
}
+ DocumentUpdate::UP make_assign_update() {
+ auto upd = std::make_unique<DocumentUpdate>(*builder.getDocumentTypeRepo(),
+ builder.getDocumentType(),
+ DocumentId(doc_id));
+ TensorDataType tensor_type(vespalib::eval::ValueType::from_spec(dense_tensor));
+ TensorFieldValue tensor_value(tensor_type);
+ tensor_value= SimpleValue::from_value(*tensor);
+ upd->addUpdate(FieldUpdate(upd->getType().getField("a1")).addUpdate(AssignValueUpdate(tensor_value)));
+ return upd;
+ }
void expect_shared_executor_tasks(size_t exp_accepted_tasks) {
auto stats = _shared.getStats();
EXPECT_EQ(exp_accepted_tasks, stats.acceptedTasks);
@@ -958,6 +974,22 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set)
assertExecuteHistory({0});
}
+TEST_F(TwoPhasePutTest, handles_assign_update_as_two_phase_put_when_specified_for_tensor_attribute)
+{
+ auto upd = make_assign_update();
+
+ DummyFieldUpdateCallback on_update;
+ update(1, *upd, 1, on_update);
+ expect_tensor_attr_calls(1, 1);
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
+
+ update(2, *upd, 2, on_update);
+ expect_tensor_attr_calls(2, 2);
+ expect_shared_executor_tasks(2);
+ assertExecuteHistory({0, 0});
+}
+
ImportedAttributeVector::SP
createImportedAttribute(const vespalib::string &name)
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 218a37139bb..8334be9e29f 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -7,13 +7,14 @@
#include <vespa/document/base/exceptions.h>
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/assignvalueupdate.h>
+#include <vespa/searchcommon/attribute/attribute_utils.h>
#include <vespa/searchcore/proton/attribute/imported_attributes_repo.h>
#include <vespa/searchcore/proton/common/attribute_updater.h>
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
-#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/searchlib/tensor/prepare_result.h>
-#include <vespa/searchcommon/attribute/attribute_utils.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
@@ -114,6 +115,21 @@ AttributeWriter::WriteContext::buildFieldPaths(const DocumentType &docType)
}
}
+AttributeWriter::AttributeWithInfo::AttributeWithInfo()
+ : attribute(),
+ executor_id(),
+ use_two_phase_put_for_assign_updates(false)
+{
+}
+
+AttributeWriter::AttributeWithInfo::AttributeWithInfo(search::AttributeVector* attribute_in,
+ ExecutorId executor_id_in)
+ : attribute(attribute_in),
+ executor_id(executor_id_in),
+ use_two_phase_put_for_assign_updates(use_two_phase_put_for_attribute(*attribute_in))
+{
+}
+
namespace {
void
@@ -340,10 +356,14 @@ private:
std::promise<std::unique_ptr<PrepareResult>> _result_promise;
public:
- PreparePutTask(SerialNum serial_num_in,
- uint32_t docid_in,
+ PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
const AttributeWriter::WriteField& field,
std::shared_ptr<DocumentFieldExtractor> field_extractor);
+ PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
+ AttributeVector& attr,
+ const FieldValue& field_value);
~PreparePutTask() override;
void run() override;
SerialNum serial_num() const { return _serial_num; }
@@ -355,12 +375,12 @@ public:
}
};
-PreparePutTask::PreparePutTask(SerialNum serial_num_in,
- uint32_t docid_in,
+PreparePutTask::PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
const AttributeWriter::WriteField& field,
std::shared_ptr<DocumentFieldExtractor> field_extractor)
- : _serial_num(serial_num_in),
- _docid(docid_in),
+ : _serial_num(serial_num),
+ _docid(docid),
_attr(field.getAttribute()),
_field_value(),
_result_promise()
@@ -370,6 +390,18 @@ PreparePutTask::PreparePutTask(SerialNum serial_num_in,
_field_value.reset(value.release());
}
+PreparePutTask::PreparePutTask(SerialNum serial_num,
+ uint32_t docid,
+ AttributeVector& attr,
+ const FieldValue& field_value)
+ : _serial_num(serial_num),
+ _docid(docid),
+ _attr(attr),
+ _field_value(field_value.clone()),
+ _result_promise()
+{
+}
+
PreparePutTask::~PreparePutTask() = default;
void
@@ -610,7 +642,7 @@ AttributeWriter::AttributeWriter(proton::IAttributeManager::SP mgr)
void AttributeWriter::setupAttriuteMapping() {
for (auto attr : getWritableAttributes()) {
vespalib::stringref name = attr->getName();
- _attrMap[name] = AttrWithId(attr, _attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix()));
+ _attrMap[name] = AttributeWithInfo(attr, _attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix()));
}
}
@@ -664,6 +696,25 @@ AttributeWriter::remove(const LidVector &lidsToRemove, SerialNum serialNum, OnWr
}
}
+namespace {
+
+bool
+is_single_assign_update(const FieldUpdate& update)
+{
+ return (update.getUpdates().size() == 1) &&
+ (update.getUpdates()[0]->getType() == ValueUpdate::Assign) &&
+ (static_cast<const AssignValueUpdate &>(*update.getUpdates()[0]).hasValue());
+}
+
+const FieldValue&
+get_single_assign_update_field_value(const FieldUpdate& update)
+{
+ const auto& assign = static_cast<const AssignValueUpdate &>(*update.getUpdates()[0]);
+ return assign.getValue();
+}
+
+}
+
void
AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, DocumentIdT lid,
OnWriteDoneType onWriteDone, IFieldUpdateCallback & onUpdate)
@@ -679,8 +730,8 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
for (const auto &fupd : upd.getUpdates()) {
LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().data());
- auto found = _attrMap.find(fupd.getField().getName());
- AttributeVector * attrp = (found != _attrMap.end()) ? found->second.first : nullptr;
+ auto itr = _attrMap.find(fupd.getField().getName());
+ AttributeVector * attrp = (itr != _attrMap.end()) ? itr->second.attribute : nullptr;
onUpdate.onUpdateField(fupd.getField().getName(), attrp);
if (__builtin_expect(attrp == nullptr, false)) {
LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().data());
@@ -688,10 +739,21 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
}
// TODO: Check if we must use > due to multiple entries for same
// document and attribute.
- if (__builtin_expect(attrp->getStatus().getLastSyncToken() >= serialNum, false))
+ if (__builtin_expect(attrp->getStatus().getLastSyncToken() >= serialNum, false)) {
continue;
- args[found->second.second.getId()]->_updates.emplace_back(attrp, &fupd);
- LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str());
+ }
+ if (itr->second.use_two_phase_put_for_assign_updates &&
+ is_single_assign_update(fupd)) {
+ auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, *attrp, get_single_assign_update_field_value(fupd));
+ auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone);
+ LOG(debug, "About to handle assign update as two phase put for docid %u in attribute vector '%s'",
+ lid, attrp->getName().c_str());
+ _shared_executor.execute(std::move(prepare_task));
+ _attributeFieldWriter.executeTask(itr->second.executor_id, std::move(complete_task));
+ } else {
+ args[itr->second.executor_id.getId()]->_updates.emplace_back(attrp, &fupd);
+ LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str());
+ }
}
// NOTE: The lifetime of the field update will be ensured by keeping the document update alive
// in a operation done context object.
@@ -708,8 +770,8 @@ void
AttributeWriter::heartBeat(SerialNum serialNum)
{
for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.second,
- [serialNum, attr=entry.second.first]()
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [serialNum, attr=entry.second.attribute]()
{ applyHeartBeat(serialNum, *attr); });
}
}
@@ -737,8 +799,8 @@ void
AttributeWriter::onReplayDone(uint32_t docIdLimit)
{
for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.second,
- [docIdLimit, attr = entry.second.first]()
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [docIdLimit, attr = entry.second.attribute]()
{ applyReplayDone(docIdLimit, *attr); });
}
_attributeFieldWriter.sync();
@@ -749,8 +811,8 @@ void
AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum)
{
for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.second,
- [wantedLidLimit, serialNum, attr=entry.second.first]()
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [wantedLidLimit, serialNum, attr=entry.second.attribute]()
{ applyCompactLidSpace(wantedLidLimit, serialNum, *attr); });
}
_attributeFieldWriter.sync();
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index 1c55f53aa5e..933de1a6c14 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -67,9 +67,18 @@ public:
bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; }
bool use_two_phase_put() const { return _use_two_phase_put; }
};
+
+ struct AttributeWithInfo {
+ search::AttributeVector* attribute;
+ ExecutorId executor_id;
+ bool use_two_phase_put_for_assign_updates;
+
+ AttributeWithInfo();
+ AttributeWithInfo(search::AttributeVector* attribute_in,
+ ExecutorId executor_id_in);
+ };
private:
- using AttrWithId = std::pair<search::AttributeVector *, ExecutorId>;
- using AttrMap = vespalib::hash_map<vespalib::string, AttrWithId>;
+ using AttrMap = vespalib::hash_map<vespalib::string, AttributeWithInfo>;
std::vector<WriteContext> _writeContexts;
const DataType *_dataType;
bool _hasStructFieldAttribute;