diff options
author | Geir Storli <geirst@verizonmedia.com> | 2020-06-15 16:32:27 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-15 16:32:27 +0200 |
commit | 6ee498d929b54202ea4b8881336bc6aa6dda7aa4 (patch) | |
tree | a856b96b1b8293d305b7fa0ba34e92b742f2e0ac | |
parent | c967eb26948e0444664322d22f7994fed71ff191 (diff) | |
parent | a507d071f8c5f5922351e55fc23a7b12ab9c3968 (diff) |
Merge pull request #13592 from vespa-engine/geirst/two-phase-put-in-attribute-writer
Two phase put in attribute writer
17 files changed, 551 insertions, 133 deletions
diff --git a/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h b/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h index 3e3683ce60f..c8b196023d6 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h +++ b/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h @@ -16,24 +16,29 @@ private: uint32_t _neighbors_to_explore_at_insert; // This is always the same as in the attribute config, and is duplicated here to simplify usage. DistanceMetric _distance_metric; + bool _allow_multi_threaded_indexing; public: HnswIndexParams(uint32_t max_links_per_node_in, uint32_t neighbors_to_explore_at_insert_in, - DistanceMetric distance_metric_in) + DistanceMetric distance_metric_in, + bool allow_multi_threaded_indexing_in = false) : _max_links_per_node(max_links_per_node_in), _neighbors_to_explore_at_insert(neighbors_to_explore_at_insert_in), - _distance_metric(distance_metric_in) + _distance_metric(distance_metric_in), + _allow_multi_threaded_indexing(allow_multi_threaded_indexing_in) {} uint32_t max_links_per_node() const { return _max_links_per_node; } uint32_t neighbors_to_explore_at_insert() const { return _neighbors_to_explore_at_insert; } DistanceMetric distance_metric() const { return _distance_metric; } + bool allow_multi_threaded_indexing() const { return _allow_multi_threaded_indexing; } bool operator==(const HnswIndexParams& rhs) const { return (_max_links_per_node == rhs._max_links_per_node && _neighbors_to_explore_at_insert == rhs._neighbors_to_explore_at_insert && - _distance_metric == rhs._distance_metric); + _distance_metric == rhs._distance_metric && + _allow_multi_threaded_indexing == rhs._allow_multi_threaded_indexing); } }; diff --git a/searchcommon/src/vespa/searchcommon/attribute/status.cpp b/searchcommon/src/vespa/searchcommon/attribute/status.cpp index da13548ec2e..f2bb49c348a 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/status.cpp +++ b/searchcommon/src/vespa/searchcommon/attribute/status.cpp @@ -20,6 +20,42 @@ Status::Status() { } +Status::Status(const Status& rhs) + : _numDocs(rhs._numDocs), + _numValues(rhs._numValues), + _numUniqueValues(rhs._numUniqueValues), + _allocated(rhs._allocated), + _used(rhs._used), + _dead(rhs._dead), + _unused(rhs._unused), + _onHold(rhs._onHold), + _onHoldMax(rhs._onHoldMax), + _lastSyncToken(rhs.getLastSyncToken()), + _updates(rhs._updates), + _nonIdempotentUpdates(rhs._nonIdempotentUpdates), + _bitVectors(rhs._bitVectors) +{ +} + +Status& +Status::operator=(const Status& rhs) +{ + _numDocs = rhs._numDocs; + _numValues = rhs._numValues; + _numUniqueValues = rhs._numUniqueValues; + _allocated = rhs._allocated; + _used = rhs._used; + _dead = rhs._dead; + _unused = rhs._unused; + _onHold = rhs._onHold; + _onHoldMax = rhs._onHoldMax; + setLastSyncToken(rhs.getLastSyncToken()); + _updates = rhs._updates; + _nonIdempotentUpdates = rhs._nonIdempotentUpdates; + _bitVectors = rhs._bitVectors; + return *this; +} + vespalib::string Status::createName(vespalib::stringref index, vespalib::stringref attr) { diff --git a/searchcommon/src/vespa/searchcommon/attribute/status.h b/searchcommon/src/vespa/searchcommon/attribute/status.h index 888355b3f58..a624309da65 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/status.h +++ b/searchcommon/src/vespa/searchcommon/attribute/status.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/vespalib/stllike/string.h> +#include <atomic> namespace search::attribute { @@ -10,6 +11,8 @@ class Status { public: Status(); + Status(const Status& rhs); + Status& operator=(const Status& rhs); void updateStatistics(uint64_t numValues, uint64_t numUniqueValue, uint64_t allocated, uint64_t used, uint64_t dead, uint64_t onHold); @@ -22,14 +25,15 @@ public: uint64_t getDead() const { return _dead; } uint64_t getOnHold() const { return _onHold; } uint64_t getOnHoldMax() const { return _onHoldMax; } - uint64_t getLastSyncToken() const { return _lastSyncToken; } + // This might be accessed from other threads than the writer thread. + uint64_t getLastSyncToken() const { return _lastSyncToken.load(std::memory_order_relaxed); } uint64_t getUpdateCount() const { return _updates; } uint64_t getNonIdempotentUpdateCount() const { return _nonIdempotentUpdates; } uint32_t getBitVectors() const { return _bitVectors; } void setNumDocs(uint64_t v) { _numDocs = v; } void incNumDocs() { ++_numDocs; } - void setLastSyncToken(uint64_t v) { _lastSyncToken = v; } + void setLastSyncToken(uint64_t v) { _lastSyncToken.store(v, std::memory_order_relaxed); } void incUpdates(uint64_t v=1) { _updates += v; } void incNonIdempotentUpdates(uint64_t v = 1) { _nonIdempotentUpdates += v; } void incBitVectors() { ++_bitVectors; } @@ -47,7 +51,7 @@ private: uint64_t _unused; uint64_t _onHold; uint64_t _onHoldMax; - uint64_t _lastSyncToken; + std::atomic<uint64_t> _lastSyncToken; uint64_t _updates; uint64_t _nonIdempotentUpdates; uint32_t _bitVectors; diff --git a/searchcommon/src/vespa/searchcommon/common/schema.cpp b/searchcommon/src/vespa/searchcommon/common/schema.cpp index a21cc43572e..c59edbef22f 100644 --- a/searchcommon/src/vespa/searchcommon/common/schema.cpp +++ b/searchcommon/src/vespa/searchcommon/common/schema.cpp @@ -70,16 +70,20 @@ namespace index { const uint32_t Schema::UNKNOWN_FIELD_ID(std::numeric_limits<uint32_t>::max()); Schema::Field::Field(vespalib::stringref n, DataType dt) - : _name(n), - _dataType(dt), - _collectionType(schema::CollectionType::SINGLE) + : Field(n, dt, schema::CollectionType::SINGLE, "") { } Schema::Field::Field(vespalib::stringref n, DataType dt, CollectionType ct) + : Field(n, dt, ct, "") +{ +} + +Schema::Field::Field(vespalib::stringref n, DataType dt, CollectionType ct, vespalib::stringref tensor_spec) : _name(n), _dataType(dt), - _collectionType(ct) + _collectionType(ct), + _tensor_spec(tensor_spec) { } @@ -111,15 +115,14 @@ Schema::Field::operator==(const Field &rhs) const { return _name == rhs._name && _dataType == rhs._dataType && - _collectionType == rhs._collectionType; + _collectionType == rhs._collectionType && + _tensor_spec == rhs._tensor_spec; } bool Schema::Field::operator!=(const Field &rhs) const { - return _name != rhs._name || - _dataType != rhs._dataType || - _collectionType != rhs._collectionType; + return !((*this) == rhs); } Schema::IndexField::IndexField(vespalib::stringref name, DataType dt) diff --git a/searchcommon/src/vespa/searchcommon/common/schema.h b/searchcommon/src/vespa/searchcommon/common/schema.h index e17d219d7e8..9003578adaf 100644 --- a/searchcommon/src/vespa/searchcommon/common/schema.h +++ b/searchcommon/src/vespa/searchcommon/common/schema.h @@ -35,10 +35,12 @@ public: vespalib::string _name; DataType _dataType; CollectionType _collectionType; + vespalib::string _tensor_spec; public: Field(vespalib::stringref n, DataType dt); Field(vespalib::stringref n, DataType dt, CollectionType ct); + Field(vespalib::stringref n, DataType dt, CollectionType ct, vespalib::stringref tensor_spec); /** * Create this field based on the given config lines. @@ -58,6 +60,7 @@ public: const vespalib::string &getName() const { return _name; } DataType getDataType() const { return _dataType; } CollectionType getCollectionType() const { return _collectionType; } + const vespalib::string& get_tensor_spec() const { return _tensor_spec; } bool matchingTypes(const Field &rhs) const { return getDataType() == rhs.getDataType() && diff --git a/searchcore/src/tests/proton/attribute/CMakeLists.txt b/searchcore/src/tests/proton/attribute/CMakeLists.txt index 1f385628480..c23d97c6e88 100644 --- a/searchcore/src/tests/proton/attribute/CMakeLists.txt +++ b/searchcore/src/tests/proton/attribute/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_executable(searchcore_attribute_test_app TEST searchcore_attribute searchcore_flushengine searchcore_pcommon + searchlib_test gtest ) vespa_add_test(NAME searchcore_attribute_test_app COMMAND searchcore_attribute_test_app) diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp index 6284fe119c4..feebe63f01a 100644 --- a/searchcore/src/tests/proton/attribute/attribute_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp @@ -19,6 +19,7 @@ #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/searchcore/proton/test/attribute_utils.h> +#include <vespa/searchcore/proton/test/mock_attribute_manager.h> #include <vespa/searchcorespi/flush/iflushtarget.h> #include <vespa/searchlib/attribute/attribute_read_guard.h> #include <vespa/searchlib/attribute/attributefactory.h> @@ -33,6 +34,7 @@ #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/predicate/predicate_hash.h> #include <vespa/searchlib/predicate/predicate_index.h> +#include <vespa/searchlib/tensor/dense_tensor_attribute.h> #include <vespa/searchlib/tensor/tensor_attribute.h> #include <vespa/searchlib/test/directory_handler.h> #include <vespa/vespalib/btree/btreeroot.hpp> @@ -57,8 +59,11 @@ using namespace vespa::config::search; using proton::ImportedAttributesRepo; using proton::test::AttributeUtils; +using proton::test::MockAttributeManager; using search::TuneFileAttributes; using search::attribute::BitVectorSearchCache; +using search::attribute::DistanceMetric; +using search::attribute::HnswIndexParams; using search::attribute::IAttributeVector; using search::attribute::ImportedAttributeVector; using search::attribute::ImportedAttributeVectorFactory; @@ -67,6 +72,8 @@ using search::index::DummyFileHeaderContext; using search::index::schema::CollectionType; using search::predicate::PredicateHash; using search::predicate::PredicateIndex; +using search::tensor::DenseTensorAttribute; +using search::tensor::PrepareResult; using search::tensor::TensorAttribute; using search::test::DirectoryHandler; using std::string; @@ -118,20 +125,16 @@ const std::shared_ptr<IDestructorCallback> emptyCallback; class AttributeWriterTest : public ::testing::Test { public: DirectoryHandler _dirHandler; - DummyFileHeaderContext _fileHeaderContext; std::unique_ptr<ForegroundTaskExecutor> _attributeFieldWriterReal; std::unique_ptr<SequencedTaskExecutorObserver> _attributeFieldWriter; - HwInfo _hwInfo; - proton::AttributeManager::SP _m; + std::shared_ptr<MockAttributeManager> _mgr; std::unique_ptr<AttributeWriter> _aw; AttributeWriterTest() : _dirHandler(test_dir), - _fileHeaderContext(), _attributeFieldWriterReal(), _attributeFieldWriter(), - _hwInfo(), - _m(), + _mgr(), _aw() { setup(1); @@ -141,21 +144,26 @@ public: _aw.reset(); _attributeFieldWriterReal = std::make_unique<ForegroundTaskExecutor>(threads); _attributeFieldWriter = std::make_unique<SequencedTaskExecutorObserver>(*_attributeFieldWriterReal); - _m = std::make_shared<proton::AttributeManager>(test_dir, "test.subdb", TuneFileAttributes(), - _fileHeaderContext, *_attributeFieldWriter, _hwInfo); + _mgr = std::make_shared<MockAttributeManager>(); + _mgr->set_writer(*_attributeFieldWriter); allocAttributeWriter(); } void allocAttributeWriter() { - _aw = std::make_unique<AttributeWriter>(_m); + _aw = std::make_unique<AttributeWriter>(_mgr); } AttributeVector::SP addAttribute(const vespalib::string &name) { - return addAttribute({name, AVConfig(AVBasicType::INT32)}, createSerialNum); + return addAttribute({name, AVConfig(AVBasicType::INT32)}); } - AttributeVector::SP addAttribute(const AttributeSpec &spec, SerialNum serialNum) { - auto ret = _m->addAttribute(spec, serialNum); + AttributeVector::SP addAttribute(const AttributeSpec &spec) { + auto ret = _mgr->addAttribute(spec.getName(), + AttributeFactory::createAttribute(spec.getName(), spec.getConfig())); allocAttributeWriter(); return ret; } + void add_attribute(AttributeVector::SP attr) { + _mgr->addAttribute(attr->getName(), std::move(attr)); + allocAttributeWriter(); + } void put(SerialNum serialNum, const Document &doc, DocumentIdT lid, bool immediateCommit = true) { _aw->put(serialNum, doc, lid, immediateCommit, emptyCallback); @@ -195,9 +203,9 @@ TEST_F(AttributeWriterTest, handles_put) DocBuilder idb(s); auto a1 = addAttribute("a1"); - auto a2 = addAttribute({"a2", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); - auto a3 = addAttribute({"a3", AVConfig(AVBasicType::FLOAT)}, createSerialNum); - auto a4 = addAttribute({"a4", AVConfig(AVBasicType::STRING)}, createSerialNum); + auto a2 = addAttribute({"a2", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); + auto a3 = addAttribute({"a3", AVConfig(AVBasicType::FLOAT)}); + auto a4 = addAttribute({"a4", AVConfig(AVBasicType::STRING)}); attribute::IntegerContent ibuf; attribute::FloatContent fbuf; @@ -275,7 +283,7 @@ TEST_F(AttributeWriterTest, handles_predicate_put) s.addAttributeField(Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); DocBuilder idb(s); - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); PredicateIndex &index = static_cast<PredicateAttribute &>(*a1).getIndex(); @@ -369,7 +377,7 @@ verifyAttributeContent(const AttributeVector & v, uint32_t lid, vespalib::string TEST_F(AttributeWriterTest, visibility_delay_is_honoured) { - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::STRING)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::STRING)}); Schema s; s.addAttributeField(Schema::AttributeField("a1", schema::DataType::STRING, CollectionType::SINGLE)); DocBuilder idb(s); @@ -381,7 +389,7 @@ TEST_F(AttributeWriterTest, visibility_delay_is_honoured) put(3, *doc, 1); EXPECT_EQ(2u, a1->getNumDocs()); EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); - AttributeWriter awDelayed(_m); + AttributeWriter awDelayed(_mgr); awDelayed.put(4, *doc, 2, false, emptyCallback); EXPECT_EQ(3u, a1->getNumDocs()); EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); @@ -391,7 +399,7 @@ TEST_F(AttributeWriterTest, visibility_delay_is_honoured) awDelayed.forceCommit(6, emptyCallback); EXPECT_EQ(6u, a1->getStatus().getLastSyncToken()); - AttributeWriter awDelayedShort(_m); + AttributeWriter awDelayedShort(_mgr); awDelayedShort.put(7, *doc, 2, false, emptyCallback); EXPECT_EQ(6u, a1->getStatus().getLastSyncToken()); awDelayedShort.put(8, *doc, 2, false, emptyCallback); @@ -414,7 +422,7 @@ TEST_F(AttributeWriterTest, visibility_delay_is_honoured) TEST_F(AttributeWriterTest, handles_predicate_remove) { - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); Schema s; s.addAttributeField( Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); @@ -477,7 +485,7 @@ TEST_F(AttributeWriterTest, handles_update) TEST_F(AttributeWriterTest, handles_predicate_update) { - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); Schema schema; schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); @@ -624,18 +632,20 @@ Tensor::UP make_tensor(const TensorSpec &spec) { return Tensor::UP(dynamic_cast<Tensor*>(tensor.release())); } +const vespalib::string sparse_tensor = "tensor(x{},y{})"; + AttributeVector::SP createTensorAttribute(AttributeWriterTest &t) { AVConfig cfg(AVBasicType::TENSOR); - cfg.setTensorType(ValueType::from_spec("tensor(x{},y{})")); - auto ret = t.addAttribute({"a1", cfg}, createSerialNum); + cfg.setTensorType(ValueType::from_spec(sparse_tensor)); + auto ret = t.addAttribute({"a1", cfg}); return ret; } Schema -createTensorSchema() { +createTensorSchema(const vespalib::string& tensor_spec = sparse_tensor) { Schema schema; - schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::TENSOR, CollectionType::SINGLE)); + schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::TENSOR, CollectionType::SINGLE, tensor_spec)); return schema; } @@ -653,7 +663,7 @@ TEST_F(AttributeWriterTest, can_write_to_tensor_attribute) auto a1 = createTensorAttribute(*this); Schema s = createTensorSchema(); DocBuilder builder(s); - auto tensor = make_tensor(TensorSpec("tensor(x{},y{})") + auto tensor = make_tensor(TensorSpec(sparse_tensor) .add({{"x", "4"}, {"y", "5"}}, 7)); Document::UP doc = createTensorPutDoc(builder, *tensor); put(1, *doc, 1); @@ -670,7 +680,7 @@ TEST_F(AttributeWriterTest, handles_tensor_assign_update) auto a1 = createTensorAttribute(*this); Schema s = createTensorSchema(); DocBuilder builder(s); - auto tensor = make_tensor(TensorSpec("tensor(x{},y{})") + auto tensor = make_tensor(TensorSpec(sparse_tensor) .add({{"x", "6"}, {"y", "7"}}, 9)); auto doc = createTensorPutDoc(builder, *tensor); put(1, *doc, 1); @@ -683,9 +693,9 @@ TEST_F(AttributeWriterTest, handles_tensor_assign_update) const document::DocumentType &dt(builder.getDocumentType()); DocumentUpdate upd(*builder.getDocumentTypeRepo(), dt, DocumentId("id:ns:searchdocument::1")); - auto new_tensor = make_tensor(TensorSpec("tensor(x{},y{})") + auto new_tensor = make_tensor(TensorSpec(sparse_tensor) .add({{"x", "8"}, {"y", "9"}}, 11)); - TensorDataType xySparseTensorDataType(vespalib::eval::ValueType::from_spec("tensor(x{},y{})")); + TensorDataType xySparseTensorDataType(vespalib::eval::ValueType::from_spec(sparse_tensor)); TensorFieldValue new_value(xySparseTensorDataType); new_value = new_tensor->clone(); upd.addUpdate(FieldUpdate(upd.getType().getField("a1")) @@ -724,9 +734,9 @@ putAttributes(AttributeWriterTest &t, std::vector<uint32_t> expExecuteHistory) DocBuilder idb(s); - AttributeVector::SP a1 = t.addAttribute("a1"); - AttributeVector::SP a2 = t.addAttribute("a2"); - AttributeVector::SP a3 = t.addAttribute("a3"); + auto a1 = t.addAttribute("a1"); + auto a2 = t.addAttribute("a2"); + auto a3 = t.addAttribute("a3"); EXPECT_EQ(1u, a1->getNumDocs()); EXPECT_EQ(1u, a2->getNumDocs()); @@ -761,6 +771,106 @@ TEST_F(AttributeWriterTest, spreads_write_over_3_write_contexts) putAttributes(*this, {0, 1, 2}); } +struct MockPrepareResult : public PrepareResult { + uint32_t docid; + const Tensor& tensor; + MockPrepareResult(uint32_t docid_in, const Tensor& tensor_in) : docid(docid_in), tensor(tensor_in) {} +}; + +class MockDenseTensorAttribute : public DenseTensorAttribute { +public: + mutable size_t prepare_set_tensor_cnt; + mutable size_t complete_set_tensor_cnt; + + MockDenseTensorAttribute(vespalib::stringref name, const AVConfig& cfg) + : DenseTensorAttribute(name, cfg), + prepare_set_tensor_cnt(0), + complete_set_tensor_cnt(0) + {} + std::unique_ptr<PrepareResult> prepare_set_tensor(uint32_t docid, const Tensor& tensor) const override { + ++prepare_set_tensor_cnt; + return std::make_unique<MockPrepareResult>(docid, tensor); + } + + virtual void complete_set_tensor(DocId docid, const Tensor& tensor, std::unique_ptr<PrepareResult> prepare_result) override { + ++complete_set_tensor_cnt; + assert(prepare_result); + auto* mock_result = dynamic_cast<MockPrepareResult*>(prepare_result.get()); + assert(mock_result); + EXPECT_EQ(docid, mock_result->docid); + EXPECT_EQ(tensor, mock_result->tensor); + } +}; + +const vespalib::string dense_tensor = "tensor(x[2])"; + +AVConfig +get_tensor_config(bool allow_multi_threaded_indexing) +{ + AVConfig cfg(AVBasicType::TENSOR); + cfg.setTensorType(ValueType::from_spec(dense_tensor)); + cfg.set_hnsw_index_params(HnswIndexParams(4, 4, DistanceMetric::Euclidean, allow_multi_threaded_indexing)); + return cfg; +} + +std::shared_ptr<MockDenseTensorAttribute> +make_mock_tensor_attribute(const vespalib::string& name, bool allow_multi_threaded_indexing) +{ + auto cfg = get_tensor_config(allow_multi_threaded_indexing); + return std::make_shared<MockDenseTensorAttribute>(name, cfg); +} + +TEST_F(AttributeWriterTest, tensor_attributes_using_two_phase_put_are_in_separate_write_contexts) +{ + addAttribute("a1"); + addAttribute({"t1", get_tensor_config(true)}); + addAttribute({"t2", get_tensor_config(true)}); + addAttribute({"t3", get_tensor_config(false)}); + allocAttributeWriter(); + + const auto& ctx = _aw->get_write_contexts(); + EXPECT_EQ(3, ctx.size()); + EXPECT_FALSE(ctx[0].use_two_phase_put()); + EXPECT_EQ(2, ctx[0].getFields().size()); + + EXPECT_TRUE(ctx[1].use_two_phase_put()); + EXPECT_EQ(1, ctx[1].getFields().size()); + EXPECT_EQ("t1", ctx[1].getFields()[0].getAttribute().getName()); + + EXPECT_TRUE(ctx[2].use_two_phase_put()); + EXPECT_EQ(1, ctx[2].getFields().size()); + EXPECT_EQ("t2", ctx[2].getFields()[0].getAttribute().getName()); +} + +TEST_F(AttributeWriterTest, handles_put_in_two_phases_when_specified_for_tensor_attribute) +{ + setup(2); + auto a1 = make_mock_tensor_attribute("a1", true); + add_attribute(a1); + Schema schema = createTensorSchema(dense_tensor); + DocBuilder builder(schema); + auto tensor = make_tensor(TensorSpec(dense_tensor) + .add({{"x", 0}}, 3).add({{"x", 1}}, 5)); + auto doc = createTensorPutDoc(builder, *tensor); + + put(1, *doc, 1); + EXPECT_EQ(1, a1->prepare_set_tensor_cnt); + EXPECT_EQ(1, a1->complete_set_tensor_cnt); + assertExecuteHistory({1, 0}); + + put(2, *doc, 2); + EXPECT_EQ(2, a1->prepare_set_tensor_cnt); + EXPECT_EQ(2, a1->complete_set_tensor_cnt); + assertExecuteHistory({1, 0, 0, 0}); + + put(3, *doc, 3); + EXPECT_EQ(3, a1->prepare_set_tensor_cnt); + EXPECT_EQ(3, a1->complete_set_tensor_cnt); + // Note that the prepare step is executed round-robin between the 2 threads. + assertExecuteHistory({1, 0, 0, 0, 1, 0}); +} + + ImportedAttributeVector::SP createImportedAttribute(const vespalib::string &name) { @@ -785,10 +895,10 @@ createImportedAttributesRepo() TEST_F(AttributeWriterTest, forceCommit_clears_search_cache_in_imported_attribute_vectors) { - _m->setImportedAttributes(createImportedAttributesRepo()); + _mgr->setImportedAttributes(createImportedAttributesRepo()); commit(10); - EXPECT_EQ(0u, _m->getImportedAttributes()->get("imported_a")->getSearchCache()->size()); - EXPECT_EQ(0u, _m->getImportedAttributes()->get("imported_b")->getSearchCache()->size()); + EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_a")->getSearchCache()->size()); + EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_b")->getSearchCache()->size()); } class StructWriterTestBase : public AttributeWriterTest { @@ -803,7 +913,7 @@ public: _valueField("value", 2, *DataType::INT, true), _structFieldType("struct") { - addAttribute({"value", AVConfig(AVBasicType::INT32, AVCollectionType::SINGLE)}, createSerialNum); + addAttribute({"value", AVConfig(AVBasicType::INT32, AVCollectionType::SINGLE)}); _type.addField(_valueField); _structFieldType.addField(_valueField); } @@ -837,7 +947,7 @@ public: _structArrayFieldType(_structFieldType), _structArrayField("array", _structArrayFieldType, true) { - addAttribute({"array.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); + addAttribute({"array.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); _type.addField(_structArrayField); } ~StructArrayWriterTest(); @@ -853,8 +963,8 @@ public: return doc; } void checkAttrs(uint32_t lid, int32_t value, const std::vector<int32_t> &arrayValues) { - auto valueAttr = _m->getAttribute("value")->getSP(); - auto arrayValueAttr = _m->getAttribute("array.value")->getSP(); + auto valueAttr = _mgr->getAttribute("value")->getSP(); + auto arrayValueAttr = _mgr->getAttribute("array.value")->getSP(); EXPECT_EQ(value, valueAttr->getInt(lid)); attribute::IntegerContent ibuf; ibuf.fill(*arrayValueAttr, lid); @@ -888,8 +998,8 @@ public: _structMapFieldType(*DataType::INT, _structFieldType), _structMapField("map", _structMapFieldType, true) { - addAttribute({"map.value.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); - addAttribute({"map.key", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); + addAttribute({"map.value.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); + addAttribute({"map.key", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); _type.addField(_structMapField); } @@ -905,9 +1015,9 @@ public: } void checkAttrs(uint32_t lid, int32_t expValue, const std::map<int32_t, int32_t> &expMap) { - auto valueAttr = _m->getAttribute("value")->getSP(); - auto mapKeyAttr = _m->getAttribute("map.key")->getSP(); - auto mapValueAttr = _m->getAttribute("map.value.value")->getSP(); + auto valueAttr = _mgr->getAttribute("value")->getSP(); + auto mapKeyAttr = _mgr->getAttribute("map.key")->getSP(); + auto mapValueAttr = _mgr->getAttribute("map.value.value")->getSP(); EXPECT_EQ(expValue, valueAttr->getInt(lid)); attribute::IntegerContent mapKeys; mapKeys.fill(*mapKeyAttr, lid); diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index e1785e1e48d..b6d6d2437d8 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -662,10 +662,11 @@ void addField(Schema & s, const std::string &name, Schema::DataType dtype, - Schema::CollectionType ctype) + Schema::CollectionType ctype, + const std::string& tensor_spec = "") { - s.addSummaryField(Schema::SummaryField(name, dtype, ctype)); - s.addAttributeField(Schema::AttributeField(name, dtype, ctype)); + s.addSummaryField(Schema::SummaryField(name, dtype, ctype, tensor_spec)); + s.addAttributeField(Schema::AttributeField(name, dtype, ctype, tensor_spec)); } @@ -682,7 +683,7 @@ Test::requireThatAttributesAreUsed() addField(s, "bg", schema::DataType::INT32, CollectionType::WEIGHTEDSET); addField(s, "bh", schema::DataType::FLOAT, CollectionType::WEIGHTEDSET); addField(s, "bi", schema::DataType::STRING, CollectionType::WEIGHTEDSET); - addField(s, "bj", schema::DataType::TENSOR, CollectionType::SINGLE); + addField(s, "bj", schema::DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})"); BuildContext bc(s); DBContext dc(bc._repo, getDocTypeName()); diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 18235116d27..d1d08a332e3 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -285,8 +285,8 @@ SchemaContext::SchemaContext() : schema(new Schema()), builder() { - schema->addAttributeField(Schema::AttributeField("tensor", DataType::TENSOR, CollectionType::SINGLE)); - schema->addAttributeField(Schema::AttributeField("tensor2", DataType::TENSOR, CollectionType::SINGLE)); + schema->addAttributeField(Schema::AttributeField("tensor", DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})")); + schema->addAttributeField(Schema::AttributeField("tensor2", DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})")); addField("i1"); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 33b9d162163..8f19d5c203b 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -12,26 +12,48 @@ #include <vespa/searchcore/proton/common/attribute_updater.h> #include <vespa/searchlib/attribute/attributevector.hpp> #include <vespa/searchlib/attribute/imported_attribute_vector.h> +#include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/searchlib/common/idestructorcallback.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <future> #include <vespa/log/log.h> LOG_SETUP(".proton.attribute.attribute_writer"); using namespace document; using namespace search; + +using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; using search::attribute::ImportedAttributeVector; +using search::tensor::PrepareResult; using vespalib::ISequencedTaskExecutor; -using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; namespace proton { using LidVector = LidVectorContext::LidVector; +namespace { + +bool +use_two_phase_put_for_attribute(const AttributeVector& attr) +{ + const auto& cfg = attr.getConfig(); + if (cfg.basicType() == search::attribute::BasicType::Type::TENSOR && + cfg.hnsw_index_params().has_value() && + cfg.hnsw_index_params().value().allow_multi_threaded_indexing()) + { + return true; + } + return false; +} + +} + AttributeWriter::WriteField::WriteField(AttributeVector &attribute) : _fieldPath(), _attribute(attribute), - _structFieldAttribute(false) + _structFieldAttribute(false), + _use_two_phase_put(use_two_phase_put_for_attribute(attribute)) { const vespalib::string &name = attribute.getName(); _structFieldAttribute = attribute::isStructFieldAttribute(name); @@ -57,11 +79,11 @@ AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType) AttributeWriter::WriteContext::WriteContext(ExecutorId executorId) : _executorId(executorId), _fields(), - _hasStructFieldAttribute(false) + _hasStructFieldAttribute(false), + _use_two_phase_put(false) { } - AttributeWriter::WriteContext::WriteContext(WriteContext &&rhs) noexcept = default; AttributeWriter::WriteContext::~WriteContext() = default; @@ -75,6 +97,13 @@ AttributeWriter::WriteContext::add(AttributeVector &attr) if (_fields.back().isStructFieldAttribute()) { _hasStructFieldAttribute = true; } + if (_fields.back().use_two_phase_put()) { + // Only support for one field per context when this is true. + assert(_fields.size() == 1); + _use_two_phase_put = true; + } else { + assert(!_use_two_phase_put); + } } void @@ -113,6 +142,27 @@ applyPutToAttribute(SerialNum serialNum, const FieldValue::UP &fieldValue, Docum } void +complete_put_to_attribute(SerialNum serial_num, + uint32_t docid, + AttributeVector& attr, + const FieldValue::SP& field_value, + std::future<std::unique_ptr<PrepareResult>>& result_future, + bool immediate_commit, + AttributeWriter::OnWriteDoneType) +{ + ensureLidSpace(serial_num, docid, attr); + if (field_value.get()) { + auto result = result_future.get(); + AttributeUpdater::complete_set_value(attr, docid, *field_value, std::move(result)); + } else { + attr.clearDoc(docid); + } + if (immediate_commit) { + attr.commit(serial_num, serial_num); + } +} + +void applyRemoveToAttribute(SerialNum serialNum, DocumentIdT lid, bool immediateCommit, AttributeVector &attr, AttributeWriter::OnWriteDoneType) { @@ -148,7 +198,6 @@ applyReplayDone(uint32_t docIdLimit, AttributeVector &attr) attr.shrinkLidSpace(); } - void applyHeartBeat(SerialNum serialNum, AttributeVector &attr) { @@ -166,7 +215,6 @@ applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVec } } - void applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, AttributeVector &attr) { @@ -208,7 +256,6 @@ struct BatchUpdateTask : public vespalib::Executor::Task { } } - SerialNum _serialNum; DocumentIdT _lid; bool _immediateCommit; @@ -221,6 +268,7 @@ class FieldContext vespalib::string _name; ExecutorId _executorId; AttributeVector *_attr; + bool _use_two_phase_put; public: FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr); @@ -228,13 +276,14 @@ public: bool operator<(const FieldContext &rhs) const; ExecutorId getExecutorId() const { return _executorId; } AttributeVector *getAttribute() const { return _attr; } + bool use_two_phase_put() const { return _use_two_phase_put; } }; - FieldContext::FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr) : _name(attr->getName()), _executorId(writer.getExecutorId(attr->getNamePrefix())), - _attr(attr) + _attr(attr), + _use_two_phase_put(use_two_phase_put_for_attribute(*attr)) { } @@ -303,6 +352,100 @@ PutTask::run() } } + +class PreparePutTask : public vespalib::Executor::Task { +private: + const SerialNum _serial_num; + const uint32_t _docid; + AttributeVector& _attr; + FieldValue::SP _field_value; + std::promise<std::unique_ptr<PrepareResult>> _result_promise; + +public: + PreparePutTask(SerialNum serial_num_in, + uint32_t docid_in, + const AttributeWriter::WriteField& field, + std::shared_ptr<DocumentFieldExtractor> field_extractor); + ~PreparePutTask() override; + void run() override; + SerialNum serial_num() const { return _serial_num; } + uint32_t docid() const { return _docid; } + AttributeVector& attr() { return _attr; } + FieldValue::SP field_value() { return _field_value; } + std::future<std::unique_ptr<PrepareResult>> result_future() { + return _result_promise.get_future(); + } +}; + +PreparePutTask::PreparePutTask(SerialNum serial_num_in, + uint32_t docid_in, + const AttributeWriter::WriteField& field, + std::shared_ptr<DocumentFieldExtractor> field_extractor) + : _serial_num(serial_num_in), + _docid(docid_in), + _attr(field.getAttribute()), + _field_value(), + _result_promise() +{ + // Note: No need to store the field extractor as we are not extracting struct fields. + auto value = field_extractor->getFieldValue(field.getFieldPath()); + _field_value.reset(value.release()); +} + +PreparePutTask::~PreparePutTask() = default; + +void +PreparePutTask::run() +{ + if (_attr.getStatus().getLastSyncToken() < _serial_num) { + if (_field_value.get()) { + _result_promise.set_value(AttributeUpdater::prepare_set_value(_attr, _docid, *_field_value)); + } + } +} + +class CompletePutTask : public vespalib::Executor::Task { +private: + const SerialNum _serial_num; + const uint32_t _docid; + AttributeVector& _attr; + FieldValue::SP _field_value; + std::future<std::unique_ptr<PrepareResult>> _result_future; + const bool _immediate_commit; + std::remove_reference_t<AttributeWriter::OnWriteDoneType> _on_write_done; + +public: + CompletePutTask(PreparePutTask& prepare_task, + bool immediate_commit, + AttributeWriter::OnWriteDoneType on_write_done); + ~CompletePutTask() override; + void run() override; +}; + +CompletePutTask::CompletePutTask(PreparePutTask& prepare_task, + bool immediate_commit, + AttributeWriter::OnWriteDoneType on_write_done) + : _serial_num(prepare_task.serial_num()), + _docid(prepare_task.docid()), + _attr(prepare_task.attr()), + _field_value(prepare_task.field_value()), + _result_future(prepare_task.result_future()), + _immediate_commit(immediate_commit), + _on_write_done(on_write_done) +{ +} + +CompletePutTask::~CompletePutTask() = default; + +void +CompletePutTask::run() +{ + if (_attr.getStatus().getLastSyncToken() < _serial_num) { + complete_put_to_attribute(_serial_num, _docid, _attr, _field_value, _result_future, + _immediate_commit, _on_write_done); + } +} + class RemoveTask : public vespalib::Executor::Task { const AttributeWriter::WriteContext &_wc; @@ -316,7 +459,6 @@ public: void run() override; }; - RemoveTask::RemoveTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone) : _wc(wc), _serialNum(serialNum), @@ -419,13 +561,22 @@ AttributeWriter::setupWriteContexts() fieldContexts.emplace_back(_attributeFieldWriter, attr); } std::sort(fieldContexts.begin(), fieldContexts.end()); - for (auto &fc : fieldContexts) { + for (const auto& fc : fieldContexts) { + if (fc.use_two_phase_put()) { + continue; + } if (_writeContexts.empty() || (_writeContexts.back().getExecutorId() != fc.getExecutorId())) { _writeContexts.emplace_back(fc.getExecutorId()); } _writeContexts.back().add(*fc.getAttribute()); } + for (const auto& fc : fieldContexts) { + if (fc.use_two_phase_put()) { + _writeContexts.emplace_back(fc.getExecutorId()); + _writeContexts.back().add(*fc.getAttribute()); + } + } for (const auto &wc : _writeContexts) { if (wc.hasStructFieldAttribute()) { _hasStructFieldAttribute = true; @@ -452,9 +603,19 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI } auto extractor = std::make_shared<DocumentFieldExtractor>(doc); for (const auto &wc : _writeContexts) { - if (allAttributes || wc.hasStructFieldAttribute()) { - auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, immediateCommit, allAttributes, onWriteDone); - _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); + if (wc.use_two_phase_put()) { + assert(wc.getFields().size() == 1); + auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor); + auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, immediateCommit, onWriteDone); + // We use the local docid to create an executor id to round-robin between the threads. + _attributeFieldWriter.executeTask(_attributeFieldWriter.getExecutorId(lid), std::move(prepare_task)); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task)); + } else { + if (allAttributes || wc.hasStructFieldAttribute()) { + auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, immediateCommit, allAttributes, + onWriteDone); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); + } } } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index 4a9726dd113..726379220e3 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -19,20 +19,23 @@ namespace proton { class AttributeWriter : public IAttributeWriter { private: - typedef search::AttributeVector AttributeVector; - typedef document::FieldPath FieldPath; - typedef document::DataType DataType; - typedef document::DocumentType DocumentType; - typedef document::FieldValue FieldValue; + using AttributeVector = search::AttributeVector; + using FieldPath = document::FieldPath; + using DataType = document::DataType; + using DocumentType = document::DocumentType; + using FieldValue = document::FieldValue; const IAttributeManager::SP _mgr; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; public: - class WriteField - { + /** + * Represents an attribute vector for a field and details about how to write to it. + */ + class WriteField { FieldPath _fieldPath; AttributeVector &_attribute; bool _structFieldAttribute; // in array/map of struct + bool _use_two_phase_put; public: WriteField(AttributeVector &attribute); ~WriteField(); @@ -40,12 +43,18 @@ public: const FieldPath &getFieldPath() const { return _fieldPath; } void buildFieldPath(const DocumentType &docType); bool isStructFieldAttribute() const { return _structFieldAttribute; } + bool use_two_phase_put() const { return _use_two_phase_put; } }; - class WriteContext - { + + /** + * Represents a set of fields (as attributes) that are handled by the same write thread. + */ + class WriteContext { ExecutorId _executorId; std::vector<WriteField> _fields; bool _hasStructFieldAttribute; + // When this is true, the context only contains a single field. + bool _use_two_phase_put; public: WriteContext(ExecutorId executorId); WriteContext(WriteContext &&rhs) noexcept; @@ -56,6 +65,7 @@ public: ExecutorId getExecutorId() const { return _executorId; } const std::vector<WriteField> &getFields() const { return _fields; } bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; } + bool use_two_phase_put() const { return _use_two_phase_put; } }; private: using AttrWithId = std::pair<search::AttributeVector *, ExecutorId>; @@ -103,6 +113,11 @@ public: void onReplayDone(uint32_t docIdLimit) override; bool hasStructFieldAttribute() const override; + + // Should only be used for unit testing. + const std::vector<WriteContext>& get_write_contexts() const { + return _writeContexts; + } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp index 8fd47c17acb..d7cf6caff28 100644 --- a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp @@ -31,6 +31,7 @@ LOG_SETUP(".proton.common.attribute_updater"); using namespace document; using vespalib::make_string; +using search::tensor::PrepareResult; using search::tensor::TensorAttribute; using search::attribute::ReferenceAttribute; @@ -471,27 +472,33 @@ AttributeUpdater::updateValue(StringAttribute & vec, uint32_t lid, const FieldVa } } +namespace { + +template <typename ExpFieldValueType> void -AttributeUpdater::updateValue(PredicateAttribute &vec, uint32_t lid, const FieldValue &val) +validate_field_value_type(const FieldValue& val, const vespalib::string& attr_type, const vespalib::string& value_type) { - if (!val.inherits(PredicateFieldValue::classId)) { + if (!val.inherits(ExpFieldValueType::classId)) { throw UpdateException( - make_string("PredicateAttribute must be updated with " - "PredicateFieldValues.")); + make_string("%s must be updated with %s, but was '%s'", + attr_type.c_str(), value_type.c_str(), val.toString(false).c_str())); } +} + +} + +void +AttributeUpdater::updateValue(PredicateAttribute &vec, uint32_t lid, const FieldValue &val) +{ + validate_field_value_type<PredicateFieldValue>(val, "PredicateAttribute", "PredicateFieldValue"); vec.updateValue(lid, static_cast<const PredicateFieldValue &>(val)); } void AttributeUpdater::updateValue(TensorAttribute &vec, uint32_t lid, const FieldValue &val) { - if (!val.inherits(TensorFieldValue::classId)) { - throw UpdateException( - make_string("TensorAttribute must be updated with " - "TensorFieldValues.")); - } - const auto &tensor = static_cast<const TensorFieldValue &>(val). - getAsTensorPtr(); + validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); + const auto &tensor = static_cast<const TensorFieldValue &>(val).getAsTensorPtr(); if (tensor) { vec.setTensor(lid, *tensor); } else { @@ -506,7 +513,7 @@ AttributeUpdater::updateValue(ReferenceAttribute &vec, uint32_t lid, const Field vec.clearDoc(lid); throw UpdateException( make_string("ReferenceAttribute must be updated with " - "ReferenceFieldValues.")); + "ReferenceFieldValue, but was '%s'", val.toString(false).c_str())); } const auto &reffv = static_cast<const ReferenceFieldValue &>(val); if (reffv.hasValidDocumentId()) { @@ -516,4 +523,57 @@ AttributeUpdater::updateValue(ReferenceAttribute &vec, uint32_t lid, const Field } } +namespace { + +void +validate_tensor_attribute_type(AttributeVector& attr) +{ + const auto& info = attr.getClass(); + if (!info.inherits(TensorAttribute::classId)) { + throw UpdateException( + make_string("Expected attribute vector '%s' to be a TensorAttribute, but was '%s'", + attr.getName().c_str(), info.name())); + } +} + +std::unique_ptr<PrepareResult> +prepare_set_tensor(TensorAttribute& attr, uint32_t docid, const FieldValue& val) +{ + validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); + const auto& tensor = static_cast<const TensorFieldValue&>(val).getAsTensorPtr(); + if (tensor) { + return attr.prepare_set_tensor(docid, *tensor); + } + return std::unique_ptr<PrepareResult>(); +} + +void +complete_set_tensor(TensorAttribute& attr, uint32_t docid, const FieldValue& val, std::unique_ptr<PrepareResult> prepare_result) +{ + validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); + const auto& tensor = static_cast<const TensorFieldValue&>(val).getAsTensorPtr(); + if (tensor) { + attr.complete_set_tensor(docid, *tensor, std::move(prepare_result)); + } else { + attr.clearDoc(docid); + } +} + +} + +std::unique_ptr<PrepareResult> +AttributeUpdater::prepare_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val) +{ + validate_tensor_attribute_type(attr); + return prepare_set_tensor(static_cast<TensorAttribute&>(attr), docid, val); +} + +void +AttributeUpdater::complete_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val, + std::unique_ptr<PrepareResult> prepare_result) +{ + validate_tensor_attribute_type(attr); + complete_set_tensor(static_cast<TensorAttribute&>(attr), docid, val, std::move(prepare_result)); +} + } // namespace search diff --git a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h index 01be6299692..32d14f6dd5a 100644 --- a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h +++ b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h @@ -10,7 +10,10 @@ namespace search { class PredicateAttribute; -namespace tensor { class TensorAttribute; } +namespace tensor { + class PrepareResult; + class TensorAttribute; +} namespace attribute {class ReferenceAttribute; } VESPA_DEFINE_EXCEPTION(UpdateException, vespalib::Exception); @@ -20,14 +23,18 @@ VESPA_DEFINE_EXCEPTION(UpdateException, vespalib::Exception); */ class AttributeUpdater { using Field = document::Field; - using FieldValue = document::FieldValue; using FieldUpdate = document::FieldUpdate; + using FieldValue = document::FieldValue; using ValueUpdate = document::ValueUpdate; public: static void handleUpdate(AttributeVector & vec, uint32_t lid, const FieldUpdate & upd); static void handleValue(AttributeVector & vec, uint32_t lid, const FieldValue & val); + static std::unique_ptr<tensor::PrepareResult> prepare_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val); + static void complete_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val, + std::unique_ptr<tensor::PrepareResult> prepare_result); + private: template <typename V> static void handleUpdate(V & vec, uint32_t lid, const ValueUpdate & upd); diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h index 3e49bb449ff..8e5d3018532 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h @@ -11,16 +11,26 @@ namespace proton::test { class MockAttributeManager : public IAttributeManager { private: search::attribute::test::MockAttributeManager _mock; + std::vector<search::AttributeVector*> _writables; std::unique_ptr<ImportedAttributesRepo> _importedAttributes; + vespalib::ISequencedTaskExecutor* _writer; public: MockAttributeManager() : _mock(), - _importedAttributes() + _writables(), + _importedAttributes(), + _writer() {} - void addAttribute(const vespalib::string &name, const search::AttributeVector::SP &attr) { + search::AttributeVector::SP addAttribute(const vespalib::string &name, const search::AttributeVector::SP &attr) { _mock.addAttribute(name, attr); + _writables.push_back(attr.get()); + return attr; + } + + void set_writer(vespalib::ISequencedTaskExecutor& writer) { + _writer = &writer; } search::AttributeGuard::UP getAttribute(const vespalib::string &name) const override { @@ -56,13 +66,18 @@ public: HDR_ABORT("should not be reached"); } vespalib::ISequencedTaskExecutor &getAttributeFieldWriter() const override { - HDR_ABORT("should not be reached"); - } - search::AttributeVector *getWritableAttribute(const vespalib::string &) const override { + assert(_writer != nullptr); + return *_writer; + } + search::AttributeVector *getWritableAttribute(const vespalib::string &name) const override { + auto attr = getAttribute(name); + if (attr) { + return attr->get(); + } return nullptr; } const std::vector<search::AttributeVector *> &getWritableAttributes() const override { - HDR_ABORT("should not be reached"); + return _writables; } void asyncForEachAttribute(std::shared_ptr<IConstAttributeFunctor>) const override { } diff --git a/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp b/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp index a7ad475d6aa..4656a5e9edd 100644 --- a/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp +++ b/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp @@ -10,38 +10,36 @@ using namespace document; namespace search::index { namespace { -TensorDataType tensorDataType(vespalib::eval::ValueType::from_spec("tensor(x{}, y{})")); - -const DataType *convert(Schema::DataType type) { +DataType::Type convert(Schema::DataType type) { switch (type) { case schema::DataType::BOOL: case schema::DataType::UINT2: case schema::DataType::UINT4: case schema::DataType::INT8: - return DataType::BYTE; + return DataType::T_BYTE; case schema::DataType::INT16: - return DataType::SHORT; + return DataType::T_SHORT; case schema::DataType::INT32: - return DataType::INT; + return DataType::T_INT; case schema::DataType::INT64: - return DataType::LONG; + return DataType::T_LONG; case schema::DataType::FLOAT: - return DataType::FLOAT; + return DataType::T_FLOAT; case schema::DataType::DOUBLE: - return DataType::DOUBLE; + return DataType::T_DOUBLE; case schema::DataType::STRING: - return DataType::STRING; + return DataType::T_STRING; case schema::DataType::RAW: - return DataType::RAW; + return DataType::T_RAW; case schema::DataType::BOOLEANTREE: - return DataType::PREDICATE; + return DataType::T_PREDICATE; case schema::DataType::TENSOR: - return &tensorDataType; + return DataType::T_TENSOR; default: break; } assert(!"Unknown datatype in schema"); - return 0; + return DataType::MAX; } void @@ -142,12 +140,12 @@ document::DocumenttypesConfig DocTypeBuilder::makeConfig() const { if (usf != usedFields.end()) { continue; // taken as index field } - const DataType *primitiveType = convert(field.getDataType()); - if (primitiveType->getId() == DataType::T_TENSOR) { - header_struct.addTensorField(field.getName(), dynamic_cast<const TensorDataType &>(*primitiveType).getTensorType().to_spec()); + auto type_id = convert(field.getDataType()); + if (type_id == DataType::T_TENSOR) { + header_struct.addTensorField(field.getName(), field.get_tensor_spec()); } else { header_struct.addField(field.getName(), type_cache.getType( - primitiveType->getId(), field.getCollectionType())); + type_id, field.getCollectionType())); } usedFields.insert(field.getName()); } @@ -158,12 +156,12 @@ document::DocumenttypesConfig DocTypeBuilder::makeConfig() const { if (usf != usedFields.end()) { continue; // taken as index field or attribute field } - const DataType *primitiveType(convert(field.getDataType())); - if (primitiveType->getId() == DataType::T_TENSOR) { - header_struct.addTensorField(field.getName(), dynamic_cast<const TensorDataType &>(*primitiveType).getTensorType().to_spec()); + auto type_id = convert(field.getDataType()); + if (type_id == DataType::T_TENSOR) { + header_struct.addTensorField(field.getName(), field.get_tensor_spec()); } else { header_struct.addField(field.getName(), type_cache.getType( - primitiveType->getId(), field.getCollectionType())); + type_id, field.getCollectionType())); } usedFields.insert(field.getName()); } diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp index f8db11ae9d8..6cf4f6d2689 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp @@ -263,7 +263,7 @@ TensorAttribute::prepare_set_tensor(DocId docid, const Tensor& tensor) const void TensorAttribute::complete_set_tensor(DocId docid, const Tensor& tensor, - std::future<std::unique_ptr<PrepareResult>> prepare_result) + std::unique_ptr<PrepareResult> prepare_result) { (void) docid; (void) tensor; diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h index f752b9f7f2e..8380e485172 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h @@ -7,7 +7,6 @@ #include "tensor_store.h" #include <vespa/searchlib/attribute/not_implemented_attribute.h> #include <vespa/vespalib/util/rcuvector.h> -#include <future> namespace search::tensor { @@ -66,9 +65,9 @@ public: * Performs the complete step in a two-phase operation to set a tensor for a document. * * This function is only called by the attribute writer thread. - * It must wait for the result from the prepare step (via the future) before it does the modifying changes. + * It uses the result from the prepare step to do the modifying changes. */ - virtual void complete_set_tensor(DocId docid, const Tensor& tensor, std::future<std::unique_ptr<PrepareResult>> prepare_result); + virtual void complete_set_tensor(DocId docid, const Tensor& tensor, std::unique_ptr<PrepareResult> prepare_result); virtual void compactWorst() = 0; }; |