diff options
17 files changed, 133 insertions, 551 deletions
diff --git a/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h b/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h index c8b196023d6..3e3683ce60f 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h +++ b/searchcommon/src/vespa/searchcommon/attribute/hnsw_index_params.h @@ -16,29 +16,24 @@ private: uint32_t _neighbors_to_explore_at_insert; // This is always the same as in the attribute config, and is duplicated here to simplify usage. DistanceMetric _distance_metric; - bool _allow_multi_threaded_indexing; public: HnswIndexParams(uint32_t max_links_per_node_in, uint32_t neighbors_to_explore_at_insert_in, - DistanceMetric distance_metric_in, - bool allow_multi_threaded_indexing_in = false) + DistanceMetric distance_metric_in) : _max_links_per_node(max_links_per_node_in), _neighbors_to_explore_at_insert(neighbors_to_explore_at_insert_in), - _distance_metric(distance_metric_in), - _allow_multi_threaded_indexing(allow_multi_threaded_indexing_in) + _distance_metric(distance_metric_in) {} uint32_t max_links_per_node() const { return _max_links_per_node; } uint32_t neighbors_to_explore_at_insert() const { return _neighbors_to_explore_at_insert; } DistanceMetric distance_metric() const { return _distance_metric; } - bool allow_multi_threaded_indexing() const { return _allow_multi_threaded_indexing; } bool operator==(const HnswIndexParams& rhs) const { return (_max_links_per_node == rhs._max_links_per_node && _neighbors_to_explore_at_insert == rhs._neighbors_to_explore_at_insert && - _distance_metric == rhs._distance_metric && - _allow_multi_threaded_indexing == rhs._allow_multi_threaded_indexing); + _distance_metric == rhs._distance_metric); } }; diff --git a/searchcommon/src/vespa/searchcommon/attribute/status.cpp b/searchcommon/src/vespa/searchcommon/attribute/status.cpp index f2bb49c348a..da13548ec2e 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/status.cpp +++ b/searchcommon/src/vespa/searchcommon/attribute/status.cpp @@ -20,42 +20,6 @@ Status::Status() { } -Status::Status(const Status& rhs) - : _numDocs(rhs._numDocs), - _numValues(rhs._numValues), - _numUniqueValues(rhs._numUniqueValues), - _allocated(rhs._allocated), - _used(rhs._used), - _dead(rhs._dead), - _unused(rhs._unused), - _onHold(rhs._onHold), - _onHoldMax(rhs._onHoldMax), - _lastSyncToken(rhs.getLastSyncToken()), - _updates(rhs._updates), - _nonIdempotentUpdates(rhs._nonIdempotentUpdates), - _bitVectors(rhs._bitVectors) -{ -} - -Status& -Status::operator=(const Status& rhs) -{ - _numDocs = rhs._numDocs; - _numValues = rhs._numValues; - _numUniqueValues = rhs._numUniqueValues; - _allocated = rhs._allocated; - _used = rhs._used; - _dead = rhs._dead; - _unused = rhs._unused; - _onHold = rhs._onHold; - _onHoldMax = rhs._onHoldMax; - setLastSyncToken(rhs.getLastSyncToken()); - _updates = rhs._updates; - _nonIdempotentUpdates = rhs._nonIdempotentUpdates; - _bitVectors = rhs._bitVectors; - return *this; -} - vespalib::string Status::createName(vespalib::stringref index, vespalib::stringref attr) { diff --git a/searchcommon/src/vespa/searchcommon/attribute/status.h b/searchcommon/src/vespa/searchcommon/attribute/status.h index a624309da65..888355b3f58 100644 --- a/searchcommon/src/vespa/searchcommon/attribute/status.h +++ b/searchcommon/src/vespa/searchcommon/attribute/status.h @@ -3,7 +3,6 @@ #pragma once #include <vespa/vespalib/stllike/string.h> -#include <atomic> namespace search::attribute { @@ -11,8 +10,6 @@ class Status { public: Status(); - Status(const Status& rhs); - Status& operator=(const Status& rhs); void updateStatistics(uint64_t numValues, uint64_t numUniqueValue, uint64_t allocated, uint64_t used, uint64_t dead, uint64_t onHold); @@ -25,15 +22,14 @@ public: uint64_t getDead() const { return _dead; } uint64_t getOnHold() const { return _onHold; } uint64_t getOnHoldMax() const { return _onHoldMax; } - // This might be accessed from other threads than the writer thread. - uint64_t getLastSyncToken() const { return _lastSyncToken.load(std::memory_order_relaxed); } + uint64_t getLastSyncToken() const { return _lastSyncToken; } uint64_t getUpdateCount() const { return _updates; } uint64_t getNonIdempotentUpdateCount() const { return _nonIdempotentUpdates; } uint32_t getBitVectors() const { return _bitVectors; } void setNumDocs(uint64_t v) { _numDocs = v; } void incNumDocs() { ++_numDocs; } - void setLastSyncToken(uint64_t v) { _lastSyncToken.store(v, std::memory_order_relaxed); } + void setLastSyncToken(uint64_t v) { _lastSyncToken = v; } void incUpdates(uint64_t v=1) { _updates += v; } void incNonIdempotentUpdates(uint64_t v = 1) { _nonIdempotentUpdates += v; } void incBitVectors() { ++_bitVectors; } @@ -51,7 +47,7 @@ private: uint64_t _unused; uint64_t _onHold; uint64_t _onHoldMax; - std::atomic<uint64_t> _lastSyncToken; + uint64_t _lastSyncToken; uint64_t _updates; uint64_t _nonIdempotentUpdates; uint32_t _bitVectors; diff --git a/searchcommon/src/vespa/searchcommon/common/schema.cpp b/searchcommon/src/vespa/searchcommon/common/schema.cpp index c59edbef22f..a21cc43572e 100644 --- a/searchcommon/src/vespa/searchcommon/common/schema.cpp +++ b/searchcommon/src/vespa/searchcommon/common/schema.cpp @@ -70,20 +70,16 @@ namespace index { const uint32_t Schema::UNKNOWN_FIELD_ID(std::numeric_limits<uint32_t>::max()); Schema::Field::Field(vespalib::stringref n, DataType dt) - : Field(n, dt, schema::CollectionType::SINGLE, "") + : _name(n), + _dataType(dt), + _collectionType(schema::CollectionType::SINGLE) { } Schema::Field::Field(vespalib::stringref n, DataType dt, CollectionType ct) - : Field(n, dt, ct, "") -{ -} - -Schema::Field::Field(vespalib::stringref n, DataType dt, CollectionType ct, vespalib::stringref tensor_spec) : _name(n), _dataType(dt), - _collectionType(ct), - _tensor_spec(tensor_spec) + _collectionType(ct) { } @@ -115,14 +111,15 @@ Schema::Field::operator==(const Field &rhs) const { return _name == rhs._name && _dataType == rhs._dataType && - _collectionType == rhs._collectionType && - _tensor_spec == rhs._tensor_spec; + _collectionType == rhs._collectionType; } bool Schema::Field::operator!=(const Field &rhs) const { - return !((*this) == rhs); + return _name != rhs._name || + _dataType != rhs._dataType || + _collectionType != rhs._collectionType; } Schema::IndexField::IndexField(vespalib::stringref name, DataType dt) diff --git a/searchcommon/src/vespa/searchcommon/common/schema.h b/searchcommon/src/vespa/searchcommon/common/schema.h index 9003578adaf..e17d219d7e8 100644 --- a/searchcommon/src/vespa/searchcommon/common/schema.h +++ b/searchcommon/src/vespa/searchcommon/common/schema.h @@ -35,12 +35,10 @@ public: vespalib::string _name; DataType _dataType; CollectionType _collectionType; - vespalib::string _tensor_spec; public: Field(vespalib::stringref n, DataType dt); Field(vespalib::stringref n, DataType dt, CollectionType ct); - Field(vespalib::stringref n, DataType dt, CollectionType ct, vespalib::stringref tensor_spec); /** * Create this field based on the given config lines. @@ -60,7 +58,6 @@ public: const vespalib::string &getName() const { return _name; } DataType getDataType() const { return _dataType; } CollectionType getCollectionType() const { return _collectionType; } - const vespalib::string& get_tensor_spec() const { return _tensor_spec; } bool matchingTypes(const Field &rhs) const { return getDataType() == rhs.getDataType() && diff --git a/searchcore/src/tests/proton/attribute/CMakeLists.txt b/searchcore/src/tests/proton/attribute/CMakeLists.txt index c23d97c6e88..1f385628480 100644 --- a/searchcore/src/tests/proton/attribute/CMakeLists.txt +++ b/searchcore/src/tests/proton/attribute/CMakeLists.txt @@ -7,7 +7,6 @@ vespa_add_executable(searchcore_attribute_test_app TEST searchcore_attribute searchcore_flushengine searchcore_pcommon - searchlib_test gtest ) vespa_add_test(NAME searchcore_attribute_test_app COMMAND searchcore_attribute_test_app) diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp index feebe63f01a..6284fe119c4 100644 --- a/searchcore/src/tests/proton/attribute/attribute_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp @@ -19,7 +19,6 @@ #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/searchcore/proton/test/attribute_utils.h> -#include <vespa/searchcore/proton/test/mock_attribute_manager.h> #include <vespa/searchcorespi/flush/iflushtarget.h> #include <vespa/searchlib/attribute/attribute_read_guard.h> #include <vespa/searchlib/attribute/attributefactory.h> @@ -34,7 +33,6 @@ #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/predicate/predicate_hash.h> #include <vespa/searchlib/predicate/predicate_index.h> -#include <vespa/searchlib/tensor/dense_tensor_attribute.h> #include <vespa/searchlib/tensor/tensor_attribute.h> #include <vespa/searchlib/test/directory_handler.h> #include <vespa/vespalib/btree/btreeroot.hpp> @@ -59,11 +57,8 @@ using namespace vespa::config::search; using proton::ImportedAttributesRepo; using proton::test::AttributeUtils; -using proton::test::MockAttributeManager; using search::TuneFileAttributes; using search::attribute::BitVectorSearchCache; -using search::attribute::DistanceMetric; -using search::attribute::HnswIndexParams; using search::attribute::IAttributeVector; using search::attribute::ImportedAttributeVector; using search::attribute::ImportedAttributeVectorFactory; @@ -72,8 +67,6 @@ using search::index::DummyFileHeaderContext; using search::index::schema::CollectionType; using search::predicate::PredicateHash; using search::predicate::PredicateIndex; -using search::tensor::DenseTensorAttribute; -using search::tensor::PrepareResult; using search::tensor::TensorAttribute; using search::test::DirectoryHandler; using std::string; @@ -125,16 +118,20 @@ const std::shared_ptr<IDestructorCallback> emptyCallback; class AttributeWriterTest : public ::testing::Test { public: DirectoryHandler _dirHandler; + DummyFileHeaderContext _fileHeaderContext; std::unique_ptr<ForegroundTaskExecutor> _attributeFieldWriterReal; std::unique_ptr<SequencedTaskExecutorObserver> _attributeFieldWriter; - std::shared_ptr<MockAttributeManager> _mgr; + HwInfo _hwInfo; + proton::AttributeManager::SP _m; std::unique_ptr<AttributeWriter> _aw; AttributeWriterTest() : _dirHandler(test_dir), + _fileHeaderContext(), _attributeFieldWriterReal(), _attributeFieldWriter(), - _mgr(), + _hwInfo(), + _m(), _aw() { setup(1); @@ -144,26 +141,21 @@ public: _aw.reset(); _attributeFieldWriterReal = std::make_unique<ForegroundTaskExecutor>(threads); _attributeFieldWriter = std::make_unique<SequencedTaskExecutorObserver>(*_attributeFieldWriterReal); - _mgr = std::make_shared<MockAttributeManager>(); - _mgr->set_writer(*_attributeFieldWriter); + _m = std::make_shared<proton::AttributeManager>(test_dir, "test.subdb", TuneFileAttributes(), + _fileHeaderContext, *_attributeFieldWriter, _hwInfo); allocAttributeWriter(); } void allocAttributeWriter() { - _aw = std::make_unique<AttributeWriter>(_mgr); + _aw = std::make_unique<AttributeWriter>(_m); } AttributeVector::SP addAttribute(const vespalib::string &name) { - return addAttribute({name, AVConfig(AVBasicType::INT32)}); + return addAttribute({name, AVConfig(AVBasicType::INT32)}, createSerialNum); } - AttributeVector::SP addAttribute(const AttributeSpec &spec) { - auto ret = _mgr->addAttribute(spec.getName(), - AttributeFactory::createAttribute(spec.getName(), spec.getConfig())); + AttributeVector::SP addAttribute(const AttributeSpec &spec, SerialNum serialNum) { + auto ret = _m->addAttribute(spec, serialNum); allocAttributeWriter(); return ret; } - void add_attribute(AttributeVector::SP attr) { - _mgr->addAttribute(attr->getName(), std::move(attr)); - allocAttributeWriter(); - } void put(SerialNum serialNum, const Document &doc, DocumentIdT lid, bool immediateCommit = true) { _aw->put(serialNum, doc, lid, immediateCommit, emptyCallback); @@ -203,9 +195,9 @@ TEST_F(AttributeWriterTest, handles_put) DocBuilder idb(s); auto a1 = addAttribute("a1"); - auto a2 = addAttribute({"a2", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); - auto a3 = addAttribute({"a3", AVConfig(AVBasicType::FLOAT)}); - auto a4 = addAttribute({"a4", AVConfig(AVBasicType::STRING)}); + auto a2 = addAttribute({"a2", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); + auto a3 = addAttribute({"a3", AVConfig(AVBasicType::FLOAT)}, createSerialNum); + auto a4 = addAttribute({"a4", AVConfig(AVBasicType::STRING)}, createSerialNum); attribute::IntegerContent ibuf; attribute::FloatContent fbuf; @@ -283,7 +275,7 @@ TEST_F(AttributeWriterTest, handles_predicate_put) s.addAttributeField(Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); DocBuilder idb(s); - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); PredicateIndex &index = static_cast<PredicateAttribute &>(*a1).getIndex(); @@ -377,7 +369,7 @@ verifyAttributeContent(const AttributeVector & v, uint32_t lid, vespalib::string TEST_F(AttributeWriterTest, visibility_delay_is_honoured) { - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::STRING)}); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::STRING)}, createSerialNum); Schema s; s.addAttributeField(Schema::AttributeField("a1", schema::DataType::STRING, CollectionType::SINGLE)); DocBuilder idb(s); @@ -389,7 +381,7 @@ TEST_F(AttributeWriterTest, visibility_delay_is_honoured) put(3, *doc, 1); EXPECT_EQ(2u, a1->getNumDocs()); EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); - AttributeWriter awDelayed(_mgr); + AttributeWriter awDelayed(_m); awDelayed.put(4, *doc, 2, false, emptyCallback); EXPECT_EQ(3u, a1->getNumDocs()); EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); @@ -399,7 +391,7 @@ TEST_F(AttributeWriterTest, visibility_delay_is_honoured) awDelayed.forceCommit(6, emptyCallback); EXPECT_EQ(6u, a1->getStatus().getLastSyncToken()); - AttributeWriter awDelayedShort(_mgr); + AttributeWriter awDelayedShort(_m); awDelayedShort.put(7, *doc, 2, false, emptyCallback); EXPECT_EQ(6u, a1->getStatus().getLastSyncToken()); awDelayedShort.put(8, *doc, 2, false, emptyCallback); @@ -422,7 +414,7 @@ TEST_F(AttributeWriterTest, visibility_delay_is_honoured) TEST_F(AttributeWriterTest, handles_predicate_remove) { - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); Schema s; s.addAttributeField( Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); @@ -485,7 +477,7 @@ TEST_F(AttributeWriterTest, handles_update) TEST_F(AttributeWriterTest, handles_predicate_update) { - auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); Schema schema; schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); @@ -632,20 +624,18 @@ Tensor::UP make_tensor(const TensorSpec &spec) { return Tensor::UP(dynamic_cast<Tensor*>(tensor.release())); } -const vespalib::string sparse_tensor = "tensor(x{},y{})"; - AttributeVector::SP createTensorAttribute(AttributeWriterTest &t) { AVConfig cfg(AVBasicType::TENSOR); - cfg.setTensorType(ValueType::from_spec(sparse_tensor)); - auto ret = t.addAttribute({"a1", cfg}); + cfg.setTensorType(ValueType::from_spec("tensor(x{},y{})")); + auto ret = t.addAttribute({"a1", cfg}, createSerialNum); return ret; } Schema -createTensorSchema(const vespalib::string& tensor_spec = sparse_tensor) { +createTensorSchema() { Schema schema; - schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::TENSOR, CollectionType::SINGLE, tensor_spec)); + schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::TENSOR, CollectionType::SINGLE)); return schema; } @@ -663,7 +653,7 @@ TEST_F(AttributeWriterTest, can_write_to_tensor_attribute) auto a1 = createTensorAttribute(*this); Schema s = createTensorSchema(); DocBuilder builder(s); - auto tensor = make_tensor(TensorSpec(sparse_tensor) + auto tensor = make_tensor(TensorSpec("tensor(x{},y{})") .add({{"x", "4"}, {"y", "5"}}, 7)); Document::UP doc = createTensorPutDoc(builder, *tensor); put(1, *doc, 1); @@ -680,7 +670,7 @@ TEST_F(AttributeWriterTest, handles_tensor_assign_update) auto a1 = createTensorAttribute(*this); Schema s = createTensorSchema(); DocBuilder builder(s); - auto tensor = make_tensor(TensorSpec(sparse_tensor) + auto tensor = make_tensor(TensorSpec("tensor(x{},y{})") .add({{"x", "6"}, {"y", "7"}}, 9)); auto doc = createTensorPutDoc(builder, *tensor); put(1, *doc, 1); @@ -693,9 +683,9 @@ TEST_F(AttributeWriterTest, handles_tensor_assign_update) const document::DocumentType &dt(builder.getDocumentType()); DocumentUpdate upd(*builder.getDocumentTypeRepo(), dt, DocumentId("id:ns:searchdocument::1")); - auto new_tensor = make_tensor(TensorSpec(sparse_tensor) + auto new_tensor = make_tensor(TensorSpec("tensor(x{},y{})") .add({{"x", "8"}, {"y", "9"}}, 11)); - TensorDataType xySparseTensorDataType(vespalib::eval::ValueType::from_spec(sparse_tensor)); + TensorDataType xySparseTensorDataType(vespalib::eval::ValueType::from_spec("tensor(x{},y{})")); TensorFieldValue new_value(xySparseTensorDataType); new_value = new_tensor->clone(); upd.addUpdate(FieldUpdate(upd.getType().getField("a1")) @@ -734,9 +724,9 @@ putAttributes(AttributeWriterTest &t, std::vector<uint32_t> expExecuteHistory) DocBuilder idb(s); - auto a1 = t.addAttribute("a1"); - auto a2 = t.addAttribute("a2"); - auto a3 = t.addAttribute("a3"); + AttributeVector::SP a1 = t.addAttribute("a1"); + AttributeVector::SP a2 = t.addAttribute("a2"); + AttributeVector::SP a3 = t.addAttribute("a3"); EXPECT_EQ(1u, a1->getNumDocs()); EXPECT_EQ(1u, a2->getNumDocs()); @@ -771,106 +761,6 @@ TEST_F(AttributeWriterTest, spreads_write_over_3_write_contexts) putAttributes(*this, {0, 1, 2}); } -struct MockPrepareResult : public PrepareResult { - uint32_t docid; - const Tensor& tensor; - MockPrepareResult(uint32_t docid_in, const Tensor& tensor_in) : docid(docid_in), tensor(tensor_in) {} -}; - -class MockDenseTensorAttribute : public DenseTensorAttribute { -public: - mutable size_t prepare_set_tensor_cnt; - mutable size_t complete_set_tensor_cnt; - - MockDenseTensorAttribute(vespalib::stringref name, const AVConfig& cfg) - : DenseTensorAttribute(name, cfg), - prepare_set_tensor_cnt(0), - complete_set_tensor_cnt(0) - {} - std::unique_ptr<PrepareResult> prepare_set_tensor(uint32_t docid, const Tensor& tensor) const override { - ++prepare_set_tensor_cnt; - return std::make_unique<MockPrepareResult>(docid, tensor); - } - - virtual void complete_set_tensor(DocId docid, const Tensor& tensor, std::unique_ptr<PrepareResult> prepare_result) override { - ++complete_set_tensor_cnt; - assert(prepare_result); - auto* mock_result = dynamic_cast<MockPrepareResult*>(prepare_result.get()); - assert(mock_result); - EXPECT_EQ(docid, mock_result->docid); - EXPECT_EQ(tensor, mock_result->tensor); - } -}; - -const vespalib::string dense_tensor = "tensor(x[2])"; - -AVConfig -get_tensor_config(bool allow_multi_threaded_indexing) -{ - AVConfig cfg(AVBasicType::TENSOR); - cfg.setTensorType(ValueType::from_spec(dense_tensor)); - cfg.set_hnsw_index_params(HnswIndexParams(4, 4, DistanceMetric::Euclidean, allow_multi_threaded_indexing)); - return cfg; -} - -std::shared_ptr<MockDenseTensorAttribute> -make_mock_tensor_attribute(const vespalib::string& name, bool allow_multi_threaded_indexing) -{ - auto cfg = get_tensor_config(allow_multi_threaded_indexing); - return std::make_shared<MockDenseTensorAttribute>(name, cfg); -} - -TEST_F(AttributeWriterTest, tensor_attributes_using_two_phase_put_are_in_separate_write_contexts) -{ - addAttribute("a1"); - addAttribute({"t1", get_tensor_config(true)}); - addAttribute({"t2", get_tensor_config(true)}); - addAttribute({"t3", get_tensor_config(false)}); - allocAttributeWriter(); - - const auto& ctx = _aw->get_write_contexts(); - EXPECT_EQ(3, ctx.size()); - EXPECT_FALSE(ctx[0].use_two_phase_put()); - EXPECT_EQ(2, ctx[0].getFields().size()); - - EXPECT_TRUE(ctx[1].use_two_phase_put()); - EXPECT_EQ(1, ctx[1].getFields().size()); - EXPECT_EQ("t1", ctx[1].getFields()[0].getAttribute().getName()); - - EXPECT_TRUE(ctx[2].use_two_phase_put()); - EXPECT_EQ(1, ctx[2].getFields().size()); - EXPECT_EQ("t2", ctx[2].getFields()[0].getAttribute().getName()); -} - -TEST_F(AttributeWriterTest, handles_put_in_two_phases_when_specified_for_tensor_attribute) -{ - setup(2); - auto a1 = make_mock_tensor_attribute("a1", true); - add_attribute(a1); - Schema schema = createTensorSchema(dense_tensor); - DocBuilder builder(schema); - auto tensor = make_tensor(TensorSpec(dense_tensor) - .add({{"x", 0}}, 3).add({{"x", 1}}, 5)); - auto doc = createTensorPutDoc(builder, *tensor); - - put(1, *doc, 1); - EXPECT_EQ(1, a1->prepare_set_tensor_cnt); - EXPECT_EQ(1, a1->complete_set_tensor_cnt); - assertExecuteHistory({1, 0}); - - put(2, *doc, 2); - EXPECT_EQ(2, a1->prepare_set_tensor_cnt); - EXPECT_EQ(2, a1->complete_set_tensor_cnt); - assertExecuteHistory({1, 0, 0, 0}); - - put(3, *doc, 3); - EXPECT_EQ(3, a1->prepare_set_tensor_cnt); - EXPECT_EQ(3, a1->complete_set_tensor_cnt); - // Note that the prepare step is executed round-robin between the 2 threads. - assertExecuteHistory({1, 0, 0, 0, 1, 0}); -} - - ImportedAttributeVector::SP createImportedAttribute(const vespalib::string &name) { @@ -895,10 +785,10 @@ createImportedAttributesRepo() TEST_F(AttributeWriterTest, forceCommit_clears_search_cache_in_imported_attribute_vectors) { - _mgr->setImportedAttributes(createImportedAttributesRepo()); + _m->setImportedAttributes(createImportedAttributesRepo()); commit(10); - EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_a")->getSearchCache()->size()); - EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_b")->getSearchCache()->size()); + EXPECT_EQ(0u, _m->getImportedAttributes()->get("imported_a")->getSearchCache()->size()); + EXPECT_EQ(0u, _m->getImportedAttributes()->get("imported_b")->getSearchCache()->size()); } class StructWriterTestBase : public AttributeWriterTest { @@ -913,7 +803,7 @@ public: _valueField("value", 2, *DataType::INT, true), _structFieldType("struct") { - addAttribute({"value", AVConfig(AVBasicType::INT32, AVCollectionType::SINGLE)}); + addAttribute({"value", AVConfig(AVBasicType::INT32, AVCollectionType::SINGLE)}, createSerialNum); _type.addField(_valueField); _structFieldType.addField(_valueField); } @@ -947,7 +837,7 @@ public: _structArrayFieldType(_structFieldType), _structArrayField("array", _structArrayFieldType, true) { - addAttribute({"array.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); + addAttribute({"array.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); _type.addField(_structArrayField); } ~StructArrayWriterTest(); @@ -963,8 +853,8 @@ public: return doc; } void checkAttrs(uint32_t lid, int32_t value, const std::vector<int32_t> &arrayValues) { - auto valueAttr = _mgr->getAttribute("value")->getSP(); - auto arrayValueAttr = _mgr->getAttribute("array.value")->getSP(); + auto valueAttr = _m->getAttribute("value")->getSP(); + auto arrayValueAttr = _m->getAttribute("array.value")->getSP(); EXPECT_EQ(value, valueAttr->getInt(lid)); attribute::IntegerContent ibuf; ibuf.fill(*arrayValueAttr, lid); @@ -998,8 +888,8 @@ public: _structMapFieldType(*DataType::INT, _structFieldType), _structMapField("map", _structMapFieldType, true) { - addAttribute({"map.value.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); - addAttribute({"map.key", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); + addAttribute({"map.value.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); + addAttribute({"map.key", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); _type.addField(_structMapField); } @@ -1015,9 +905,9 @@ public: } void checkAttrs(uint32_t lid, int32_t expValue, const std::map<int32_t, int32_t> &expMap) { - auto valueAttr = _mgr->getAttribute("value")->getSP(); - auto mapKeyAttr = _mgr->getAttribute("map.key")->getSP(); - auto mapValueAttr = _mgr->getAttribute("map.value.value")->getSP(); + auto valueAttr = _m->getAttribute("value")->getSP(); + auto mapKeyAttr = _m->getAttribute("map.key")->getSP(); + auto mapValueAttr = _m->getAttribute("map.value.value")->getSP(); EXPECT_EQ(expValue, valueAttr->getInt(lid)); attribute::IntegerContent mapKeys; mapKeys.fill(*mapKeyAttr, lid); diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index b6d6d2437d8..e1785e1e48d 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -662,11 +662,10 @@ void addField(Schema & s, const std::string &name, Schema::DataType dtype, - Schema::CollectionType ctype, - const std::string& tensor_spec = "") + Schema::CollectionType ctype) { - s.addSummaryField(Schema::SummaryField(name, dtype, ctype, tensor_spec)); - s.addAttributeField(Schema::AttributeField(name, dtype, ctype, tensor_spec)); + s.addSummaryField(Schema::SummaryField(name, dtype, ctype)); + s.addAttributeField(Schema::AttributeField(name, dtype, ctype)); } @@ -683,7 +682,7 @@ Test::requireThatAttributesAreUsed() addField(s, "bg", schema::DataType::INT32, CollectionType::WEIGHTEDSET); addField(s, "bh", schema::DataType::FLOAT, CollectionType::WEIGHTEDSET); addField(s, "bi", schema::DataType::STRING, CollectionType::WEIGHTEDSET); - addField(s, "bj", schema::DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})"); + addField(s, "bj", schema::DataType::TENSOR, CollectionType::SINGLE); BuildContext bc(s); DBContext dc(bc._repo, getDocTypeName()); diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index d1d08a332e3..18235116d27 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -285,8 +285,8 @@ SchemaContext::SchemaContext() : schema(new Schema()), builder() { - schema->addAttributeField(Schema::AttributeField("tensor", DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})")); - schema->addAttributeField(Schema::AttributeField("tensor2", DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})")); + schema->addAttributeField(Schema::AttributeField("tensor", DataType::TENSOR, CollectionType::SINGLE)); + schema->addAttributeField(Schema::AttributeField("tensor2", DataType::TENSOR, CollectionType::SINGLE)); addField("i1"); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 8f19d5c203b..33b9d162163 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -12,48 +12,26 @@ #include <vespa/searchcore/proton/common/attribute_updater.h> #include <vespa/searchlib/attribute/attributevector.hpp> #include <vespa/searchlib/attribute/imported_attribute_vector.h> -#include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/searchlib/common/idestructorcallback.h> #include <vespa/vespalib/stllike/hash_map.hpp> -#include <future> #include <vespa/log/log.h> LOG_SETUP(".proton.attribute.attribute_writer"); using namespace document; using namespace search; - -using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; using search::attribute::ImportedAttributeVector; -using search::tensor::PrepareResult; using vespalib::ISequencedTaskExecutor; +using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; namespace proton { using LidVector = LidVectorContext::LidVector; -namespace { - -bool -use_two_phase_put_for_attribute(const AttributeVector& attr) -{ - const auto& cfg = attr.getConfig(); - if (cfg.basicType() == search::attribute::BasicType::Type::TENSOR && - cfg.hnsw_index_params().has_value() && - cfg.hnsw_index_params().value().allow_multi_threaded_indexing()) - { - return true; - } - return false; -} - -} - AttributeWriter::WriteField::WriteField(AttributeVector &attribute) : _fieldPath(), _attribute(attribute), - _structFieldAttribute(false), - _use_two_phase_put(use_two_phase_put_for_attribute(attribute)) + _structFieldAttribute(false) { const vespalib::string &name = attribute.getName(); _structFieldAttribute = attribute::isStructFieldAttribute(name); @@ -79,11 +57,11 @@ AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType) AttributeWriter::WriteContext::WriteContext(ExecutorId executorId) : _executorId(executorId), _fields(), - _hasStructFieldAttribute(false), - _use_two_phase_put(false) + _hasStructFieldAttribute(false) { } + AttributeWriter::WriteContext::WriteContext(WriteContext &&rhs) noexcept = default; AttributeWriter::WriteContext::~WriteContext() = default; @@ -97,13 +75,6 @@ AttributeWriter::WriteContext::add(AttributeVector &attr) if (_fields.back().isStructFieldAttribute()) { _hasStructFieldAttribute = true; } - if (_fields.back().use_two_phase_put()) { - // Only support for one field per context when this is true. - assert(_fields.size() == 1); - _use_two_phase_put = true; - } else { - assert(!_use_two_phase_put); - } } void @@ -142,27 +113,6 @@ applyPutToAttribute(SerialNum serialNum, const FieldValue::UP &fieldValue, Docum } void -complete_put_to_attribute(SerialNum serial_num, - uint32_t docid, - AttributeVector& attr, - const FieldValue::SP& field_value, - std::future<std::unique_ptr<PrepareResult>>& result_future, - bool immediate_commit, - AttributeWriter::OnWriteDoneType) -{ - ensureLidSpace(serial_num, docid, attr); - if (field_value.get()) { - auto result = result_future.get(); - AttributeUpdater::complete_set_value(attr, docid, *field_value, std::move(result)); - } else { - attr.clearDoc(docid); - } - if (immediate_commit) { - attr.commit(serial_num, serial_num); - } -} - -void applyRemoveToAttribute(SerialNum serialNum, DocumentIdT lid, bool immediateCommit, AttributeVector &attr, AttributeWriter::OnWriteDoneType) { @@ -198,6 +148,7 @@ applyReplayDone(uint32_t docIdLimit, AttributeVector &attr) attr.shrinkLidSpace(); } + void applyHeartBeat(SerialNum serialNum, AttributeVector &attr) { @@ -215,6 +166,7 @@ applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVec } } + void applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, AttributeVector &attr) { @@ -256,6 +208,7 @@ struct BatchUpdateTask : public vespalib::Executor::Task { } } + SerialNum _serialNum; DocumentIdT _lid; bool _immediateCommit; @@ -268,7 +221,6 @@ class FieldContext vespalib::string _name; ExecutorId _executorId; AttributeVector *_attr; - bool _use_two_phase_put; public: FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr); @@ -276,14 +228,13 @@ public: bool operator<(const FieldContext &rhs) const; ExecutorId getExecutorId() const { return _executorId; } AttributeVector *getAttribute() const { return _attr; } - bool use_two_phase_put() const { return _use_two_phase_put; } }; + FieldContext::FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr) : _name(attr->getName()), _executorId(writer.getExecutorId(attr->getNamePrefix())), - _attr(attr), - _use_two_phase_put(use_two_phase_put_for_attribute(*attr)) + _attr(attr) { } @@ -352,100 +303,6 @@ PutTask::run() } } - -class PreparePutTask : public vespalib::Executor::Task { -private: - const SerialNum _serial_num; - const uint32_t _docid; - AttributeVector& _attr; - FieldValue::SP _field_value; - std::promise<std::unique_ptr<PrepareResult>> _result_promise; - -public: - PreparePutTask(SerialNum serial_num_in, - uint32_t docid_in, - const AttributeWriter::WriteField& field, - std::shared_ptr<DocumentFieldExtractor> field_extractor); - ~PreparePutTask() override; - void run() override; - SerialNum serial_num() const { return _serial_num; } - uint32_t docid() const { return _docid; } - AttributeVector& attr() { return _attr; } - FieldValue::SP field_value() { return _field_value; } - std::future<std::unique_ptr<PrepareResult>> result_future() { - return _result_promise.get_future(); - } -}; - -PreparePutTask::PreparePutTask(SerialNum serial_num_in, - uint32_t docid_in, - const AttributeWriter::WriteField& field, - std::shared_ptr<DocumentFieldExtractor> field_extractor) - : _serial_num(serial_num_in), - _docid(docid_in), - _attr(field.getAttribute()), - _field_value(), - _result_promise() -{ - // Note: No need to store the field extractor as we are not extracting struct fields. - auto value = field_extractor->getFieldValue(field.getFieldPath()); - _field_value.reset(value.release()); -} - -PreparePutTask::~PreparePutTask() = default; - -void -PreparePutTask::run() -{ - if (_attr.getStatus().getLastSyncToken() < _serial_num) { - if (_field_value.get()) { - _result_promise.set_value(AttributeUpdater::prepare_set_value(_attr, _docid, *_field_value)); - } - } -} - -class CompletePutTask : public vespalib::Executor::Task { -private: - const SerialNum _serial_num; - const uint32_t _docid; - AttributeVector& _attr; - FieldValue::SP _field_value; - std::future<std::unique_ptr<PrepareResult>> _result_future; - const bool _immediate_commit; - std::remove_reference_t<AttributeWriter::OnWriteDoneType> _on_write_done; - -public: - CompletePutTask(PreparePutTask& prepare_task, - bool immediate_commit, - AttributeWriter::OnWriteDoneType on_write_done); - ~CompletePutTask() override; - void run() override; -}; - -CompletePutTask::CompletePutTask(PreparePutTask& prepare_task, - bool immediate_commit, - AttributeWriter::OnWriteDoneType on_write_done) - : _serial_num(prepare_task.serial_num()), - _docid(prepare_task.docid()), - _attr(prepare_task.attr()), - _field_value(prepare_task.field_value()), - _result_future(prepare_task.result_future()), - _immediate_commit(immediate_commit), - _on_write_done(on_write_done) -{ -} - -CompletePutTask::~CompletePutTask() = default; - -void -CompletePutTask::run() -{ - if (_attr.getStatus().getLastSyncToken() < _serial_num) { - complete_put_to_attribute(_serial_num, _docid, _attr, _field_value, _result_future, - _immediate_commit, _on_write_done); - } -} - class RemoveTask : public vespalib::Executor::Task { const AttributeWriter::WriteContext &_wc; @@ -459,6 +316,7 @@ public: void run() override; }; + RemoveTask::RemoveTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone) : _wc(wc), _serialNum(serialNum), @@ -561,22 +419,13 @@ AttributeWriter::setupWriteContexts() fieldContexts.emplace_back(_attributeFieldWriter, attr); } std::sort(fieldContexts.begin(), fieldContexts.end()); - for (const auto& fc : fieldContexts) { - if (fc.use_two_phase_put()) { - continue; - } + for (auto &fc : fieldContexts) { if (_writeContexts.empty() || (_writeContexts.back().getExecutorId() != fc.getExecutorId())) { _writeContexts.emplace_back(fc.getExecutorId()); } _writeContexts.back().add(*fc.getAttribute()); } - for (const auto& fc : fieldContexts) { - if (fc.use_two_phase_put()) { - _writeContexts.emplace_back(fc.getExecutorId()); - _writeContexts.back().add(*fc.getAttribute()); - } - } for (const auto &wc : _writeContexts) { if (wc.hasStructFieldAttribute()) { _hasStructFieldAttribute = true; @@ -603,19 +452,9 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI } auto extractor = std::make_shared<DocumentFieldExtractor>(doc); for (const auto &wc : _writeContexts) { - if (wc.use_two_phase_put()) { - assert(wc.getFields().size() == 1); - auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor); - auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, immediateCommit, onWriteDone); - // We use the local docid to create an executor id to round-robin between the threads. - _attributeFieldWriter.executeTask(_attributeFieldWriter.getExecutorId(lid), std::move(prepare_task)); - _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task)); - } else { - if (allAttributes || wc.hasStructFieldAttribute()) { - auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, immediateCommit, allAttributes, - onWriteDone); - _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); - } + if (allAttributes || wc.hasStructFieldAttribute()) { + auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, immediateCommit, allAttributes, onWriteDone); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); } } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index 726379220e3..4a9726dd113 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -19,23 +19,20 @@ namespace proton { class AttributeWriter : public IAttributeWriter { private: - using AttributeVector = search::AttributeVector; - using FieldPath = document::FieldPath; - using DataType = document::DataType; - using DocumentType = document::DocumentType; - using FieldValue = document::FieldValue; + typedef search::AttributeVector AttributeVector; + typedef document::FieldPath FieldPath; + typedef document::DataType DataType; + typedef document::DocumentType DocumentType; + typedef document::FieldValue FieldValue; const IAttributeManager::SP _mgr; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; public: - /** - * Represents an attribute vector for a field and details about how to write to it. - */ - class WriteField { + class WriteField + { FieldPath _fieldPath; AttributeVector &_attribute; bool _structFieldAttribute; // in array/map of struct - bool _use_two_phase_put; public: WriteField(AttributeVector &attribute); ~WriteField(); @@ -43,18 +40,12 @@ public: const FieldPath &getFieldPath() const { return _fieldPath; } void buildFieldPath(const DocumentType &docType); bool isStructFieldAttribute() const { return _structFieldAttribute; } - bool use_two_phase_put() const { return _use_two_phase_put; } }; - - /** - * Represents a set of fields (as attributes) that are handled by the same write thread. - */ - class WriteContext { + class WriteContext + { ExecutorId _executorId; std::vector<WriteField> _fields; bool _hasStructFieldAttribute; - // When this is true, the context only contains a single field. - bool _use_two_phase_put; public: WriteContext(ExecutorId executorId); WriteContext(WriteContext &&rhs) noexcept; @@ -65,7 +56,6 @@ public: ExecutorId getExecutorId() const { return _executorId; } const std::vector<WriteField> &getFields() const { return _fields; } bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; } - bool use_two_phase_put() const { return _use_two_phase_put; } }; private: using AttrWithId = std::pair<search::AttributeVector *, ExecutorId>; @@ -113,11 +103,6 @@ public: void onReplayDone(uint32_t docIdLimit) override; bool hasStructFieldAttribute() const override; - - // Should only be used for unit testing. - const std::vector<WriteContext>& get_write_contexts() const { - return _writeContexts; - } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp index d7cf6caff28..8fd47c17acb 100644 --- a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp @@ -31,7 +31,6 @@ LOG_SETUP(".proton.common.attribute_updater"); using namespace document; using vespalib::make_string; -using search::tensor::PrepareResult; using search::tensor::TensorAttribute; using search::attribute::ReferenceAttribute; @@ -472,33 +471,27 @@ AttributeUpdater::updateValue(StringAttribute & vec, uint32_t lid, const FieldVa } } -namespace { - -template <typename ExpFieldValueType> void -validate_field_value_type(const FieldValue& val, const vespalib::string& attr_type, const vespalib::string& value_type) +AttributeUpdater::updateValue(PredicateAttribute &vec, uint32_t lid, const FieldValue &val) { - if (!val.inherits(ExpFieldValueType::classId)) { + if (!val.inherits(PredicateFieldValue::classId)) { throw UpdateException( - make_string("%s must be updated with %s, but was '%s'", - attr_type.c_str(), value_type.c_str(), val.toString(false).c_str())); + make_string("PredicateAttribute must be updated with " + "PredicateFieldValues.")); } -} - -} - -void -AttributeUpdater::updateValue(PredicateAttribute &vec, uint32_t lid, const FieldValue &val) -{ - validate_field_value_type<PredicateFieldValue>(val, "PredicateAttribute", "PredicateFieldValue"); vec.updateValue(lid, static_cast<const PredicateFieldValue &>(val)); } void AttributeUpdater::updateValue(TensorAttribute &vec, uint32_t lid, const FieldValue &val) { - validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); - const auto &tensor = static_cast<const TensorFieldValue &>(val).getAsTensorPtr(); + if (!val.inherits(TensorFieldValue::classId)) { + throw UpdateException( + make_string("TensorAttribute must be updated with " + "TensorFieldValues.")); + } + const auto &tensor = static_cast<const TensorFieldValue &>(val). + getAsTensorPtr(); if (tensor) { vec.setTensor(lid, *tensor); } else { @@ -513,7 +506,7 @@ AttributeUpdater::updateValue(ReferenceAttribute &vec, uint32_t lid, const Field vec.clearDoc(lid); throw UpdateException( make_string("ReferenceAttribute must be updated with " - "ReferenceFieldValue, but was '%s'", val.toString(false).c_str())); + "ReferenceFieldValues.")); } const auto &reffv = static_cast<const ReferenceFieldValue &>(val); if (reffv.hasValidDocumentId()) { @@ -523,57 +516,4 @@ AttributeUpdater::updateValue(ReferenceAttribute &vec, uint32_t lid, const Field } } -namespace { - -void -validate_tensor_attribute_type(AttributeVector& attr) -{ - const auto& info = attr.getClass(); - if (!info.inherits(TensorAttribute::classId)) { - throw UpdateException( - make_string("Expected attribute vector '%s' to be a TensorAttribute, but was '%s'", - attr.getName().c_str(), info.name())); - } -} - -std::unique_ptr<PrepareResult> -prepare_set_tensor(TensorAttribute& attr, uint32_t docid, const FieldValue& val) -{ - validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); - const auto& tensor = static_cast<const TensorFieldValue&>(val).getAsTensorPtr(); - if (tensor) { - return attr.prepare_set_tensor(docid, *tensor); - } - return std::unique_ptr<PrepareResult>(); -} - -void -complete_set_tensor(TensorAttribute& attr, uint32_t docid, const FieldValue& val, std::unique_ptr<PrepareResult> prepare_result) -{ - validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); - const auto& tensor = static_cast<const TensorFieldValue&>(val).getAsTensorPtr(); - if (tensor) { - attr.complete_set_tensor(docid, *tensor, std::move(prepare_result)); - } else { - attr.clearDoc(docid); - } -} - -} - -std::unique_ptr<PrepareResult> -AttributeUpdater::prepare_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val) -{ - validate_tensor_attribute_type(attr); - return prepare_set_tensor(static_cast<TensorAttribute&>(attr), docid, val); -} - -void -AttributeUpdater::complete_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val, - std::unique_ptr<PrepareResult> prepare_result) -{ - validate_tensor_attribute_type(attr); - complete_set_tensor(static_cast<TensorAttribute&>(attr), docid, val, std::move(prepare_result)); -} - } // namespace search diff --git a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h index 32d14f6dd5a..01be6299692 100644 --- a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h +++ b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h @@ -10,10 +10,7 @@ namespace search { class PredicateAttribute; -namespace tensor { - class PrepareResult; - class TensorAttribute; -} +namespace tensor { class TensorAttribute; } namespace attribute {class ReferenceAttribute; } VESPA_DEFINE_EXCEPTION(UpdateException, vespalib::Exception); @@ -23,18 +20,14 @@ VESPA_DEFINE_EXCEPTION(UpdateException, vespalib::Exception); */ class AttributeUpdater { using Field = document::Field; - using FieldUpdate = document::FieldUpdate; using FieldValue = document::FieldValue; + using FieldUpdate = document::FieldUpdate; using ValueUpdate = document::ValueUpdate; public: static void handleUpdate(AttributeVector & vec, uint32_t lid, const FieldUpdate & upd); static void handleValue(AttributeVector & vec, uint32_t lid, const FieldValue & val); - static std::unique_ptr<tensor::PrepareResult> prepare_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val); - static void complete_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val, - std::unique_ptr<tensor::PrepareResult> prepare_result); - private: template <typename V> static void handleUpdate(V & vec, uint32_t lid, const ValueUpdate & upd); diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h index 8e5d3018532..3e49bb449ff 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h @@ -11,26 +11,16 @@ namespace proton::test { class MockAttributeManager : public IAttributeManager { private: search::attribute::test::MockAttributeManager _mock; - std::vector<search::AttributeVector*> _writables; std::unique_ptr<ImportedAttributesRepo> _importedAttributes; - vespalib::ISequencedTaskExecutor* _writer; public: MockAttributeManager() : _mock(), - _writables(), - _importedAttributes(), - _writer() + _importedAttributes() {} - search::AttributeVector::SP addAttribute(const vespalib::string &name, const search::AttributeVector::SP &attr) { + void addAttribute(const vespalib::string &name, const search::AttributeVector::SP &attr) { _mock.addAttribute(name, attr); - _writables.push_back(attr.get()); - return attr; - } - - void set_writer(vespalib::ISequencedTaskExecutor& writer) { - _writer = &writer; } search::AttributeGuard::UP getAttribute(const vespalib::string &name) const override { @@ -66,18 +56,13 @@ public: HDR_ABORT("should not be reached"); } vespalib::ISequencedTaskExecutor &getAttributeFieldWriter() const override { - assert(_writer != nullptr); - return *_writer; - } - search::AttributeVector *getWritableAttribute(const vespalib::string &name) const override { - auto attr = getAttribute(name); - if (attr) { - return attr->get(); - } + HDR_ABORT("should not be reached"); + } + search::AttributeVector *getWritableAttribute(const vespalib::string &) const override { return nullptr; } const std::vector<search::AttributeVector *> &getWritableAttributes() const override { - return _writables; + HDR_ABORT("should not be reached"); } void asyncForEachAttribute(std::shared_ptr<IConstAttributeFunctor>) const override { } diff --git a/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp b/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp index 4656a5e9edd..a7ad475d6aa 100644 --- a/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp +++ b/searchlib/src/vespa/searchlib/index/doctypebuilder.cpp @@ -10,36 +10,38 @@ using namespace document; namespace search::index { namespace { -DataType::Type convert(Schema::DataType type) { +TensorDataType tensorDataType(vespalib::eval::ValueType::from_spec("tensor(x{}, y{})")); + +const DataType *convert(Schema::DataType type) { switch (type) { case schema::DataType::BOOL: case schema::DataType::UINT2: case schema::DataType::UINT4: case schema::DataType::INT8: - return DataType::T_BYTE; + return DataType::BYTE; case schema::DataType::INT16: - return DataType::T_SHORT; + return DataType::SHORT; case schema::DataType::INT32: - return DataType::T_INT; + return DataType::INT; case schema::DataType::INT64: - return DataType::T_LONG; + return DataType::LONG; case schema::DataType::FLOAT: - return DataType::T_FLOAT; + return DataType::FLOAT; case schema::DataType::DOUBLE: - return DataType::T_DOUBLE; + return DataType::DOUBLE; case schema::DataType::STRING: - return DataType::T_STRING; + return DataType::STRING; case schema::DataType::RAW: - return DataType::T_RAW; + return DataType::RAW; case schema::DataType::BOOLEANTREE: - return DataType::T_PREDICATE; + return DataType::PREDICATE; case schema::DataType::TENSOR: - return DataType::T_TENSOR; + return &tensorDataType; default: break; } assert(!"Unknown datatype in schema"); - return DataType::MAX; + return 0; } void @@ -140,12 +142,12 @@ document::DocumenttypesConfig DocTypeBuilder::makeConfig() const { if (usf != usedFields.end()) { continue; // taken as index field } - auto type_id = convert(field.getDataType()); - if (type_id == DataType::T_TENSOR) { - header_struct.addTensorField(field.getName(), field.get_tensor_spec()); + const DataType *primitiveType = convert(field.getDataType()); + if (primitiveType->getId() == DataType::T_TENSOR) { + header_struct.addTensorField(field.getName(), dynamic_cast<const TensorDataType &>(*primitiveType).getTensorType().to_spec()); } else { header_struct.addField(field.getName(), type_cache.getType( - type_id, field.getCollectionType())); + primitiveType->getId(), field.getCollectionType())); } usedFields.insert(field.getName()); } @@ -156,12 +158,12 @@ document::DocumenttypesConfig DocTypeBuilder::makeConfig() const { if (usf != usedFields.end()) { continue; // taken as index field or attribute field } - auto type_id = convert(field.getDataType()); - if (type_id == DataType::T_TENSOR) { - header_struct.addTensorField(field.getName(), field.get_tensor_spec()); + const DataType *primitiveType(convert(field.getDataType())); + if (primitiveType->getId() == DataType::T_TENSOR) { + header_struct.addTensorField(field.getName(), dynamic_cast<const TensorDataType &>(*primitiveType).getTensorType().to_spec()); } else { header_struct.addField(field.getName(), type_cache.getType( - type_id, field.getCollectionType())); + primitiveType->getId(), field.getCollectionType())); } usedFields.insert(field.getName()); } diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp index 6cf4f6d2689..f8db11ae9d8 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp @@ -263,7 +263,7 @@ TensorAttribute::prepare_set_tensor(DocId docid, const Tensor& tensor) const void TensorAttribute::complete_set_tensor(DocId docid, const Tensor& tensor, - std::unique_ptr<PrepareResult> prepare_result) + std::future<std::unique_ptr<PrepareResult>> prepare_result) { (void) docid; (void) tensor; diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h index 8380e485172..f752b9f7f2e 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h @@ -7,6 +7,7 @@ #include "tensor_store.h" #include <vespa/searchlib/attribute/not_implemented_attribute.h> #include <vespa/vespalib/util/rcuvector.h> +#include <future> namespace search::tensor { @@ -65,9 +66,9 @@ public: * Performs the complete step in a two-phase operation to set a tensor for a document. * * This function is only called by the attribute writer thread. - * It uses the result from the prepare step to do the modifying changes. + * It must wait for the result from the prepare step (via the future) before it does the modifying changes. */ - virtual void complete_set_tensor(DocId docid, const Tensor& tensor, std::unique_ptr<PrepareResult> prepare_result); + virtual void complete_set_tensor(DocId docid, const Tensor& tensor, std::future<std::unique_ptr<PrepareResult>> prepare_result); virtual void compactWorst() = 0; }; |