diff options
Diffstat (limited to 'searchcore/src')
23 files changed, 982 insertions, 463 deletions
diff --git a/searchcore/src/tests/proton/attribute/CMakeLists.txt b/searchcore/src/tests/proton/attribute/CMakeLists.txt index 79f81f3daa1..c23d97c6e88 100644 --- a/searchcore/src/tests/proton/attribute/CMakeLists.txt +++ b/searchcore/src/tests/proton/attribute/CMakeLists.txt @@ -7,9 +7,11 @@ vespa_add_executable(searchcore_attribute_test_app TEST searchcore_attribute searchcore_flushengine searchcore_pcommon + searchlib_test + gtest ) -vespa_add_test(NAME searchcore_attribute_test_app COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/attribute_test.sh - DEPENDS searchcore_attribute_test_app) +vespa_add_test(NAME searchcore_attribute_test_app COMMAND searchcore_attribute_test_app) + vespa_add_executable(searchcore_attributeflush_test_app TEST SOURCES attributeflush_test.cpp diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp index 839ef14fcb0..91f580cd221 100644 --- a/searchcore/src/tests/proton/attribute/attribute_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp @@ -7,41 +7,43 @@ #include <vespa/document/update/arithmeticvalueupdate.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/document/update/documentupdate.h> -#include <vespa/eval/tensor/tensor.h> #include <vespa/eval/tensor/default_tensor_engine.h> +#include <vespa/eval/tensor/tensor.h> #include <vespa/searchcommon/attribute/attributecontent.h> +#include <vespa/searchcommon/attribute/iattributevector.h> #include <vespa/searchcore/proton/attribute/attribute_collection_spec_factory.h> #include <vespa/searchcore/proton/attribute/attribute_writer.h> -#include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h> #include <vespa/searchcore/proton/attribute/attributemanager.h> #include <vespa/searchcore/proton/attribute/filter_attribute_manager.h> +#include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h> #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/searchcore/proton/test/attribute_utils.h> +#include <vespa/searchcore/proton/test/mock_attribute_manager.h> #include <vespa/searchcorespi/flush/iflushtarget.h> -#include <vespa/searchlib/attribute/attributefactory.h> #include <vespa/searchlib/attribute/attribute_read_guard.h> +#include <vespa/searchlib/attribute/attributefactory.h> #include <vespa/searchlib/attribute/bitvector_search_cache.h> #include <vespa/searchlib/attribute/imported_attribute_vector.h> #include <vespa/searchlib/attribute/imported_attribute_vector_factory.h> #include <vespa/searchlib/attribute/integerbase.h> #include <vespa/searchlib/attribute/predicate_attribute.h> -#include <vespa/vespalib/util/foregroundtaskexecutor.h> +#include <vespa/searchlib/attribute/singlenumericattribute.hpp> #include <vespa/searchlib/common/idestructorcallback.h> -#include <vespa/vespalib/util/sequencedtaskexecutorobserver.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/predicate/predicate_hash.h> #include <vespa/searchlib/predicate/predicate_index.h> +#include <vespa/searchlib/tensor/dense_tensor_attribute.h> #include <vespa/searchlib/tensor/tensor_attribute.h> #include <vespa/searchlib/test/directory_handler.h> +#include <vespa/vespalib/btree/btreeroot.hpp> +#include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/test/insertion_operators.h> -#include <vespa/vespalib/testkit/testapp.h> -#include <vespa/searchcommon/attribute/iattributevector.h> -#include <vespa/vespalib/btree/btreeroot.hpp> -#include <vespa/searchlib/attribute/singlenumericattribute.hpp> +#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/foregroundtaskexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutorobserver.h> #include <vespa/log/log.h> LOG_SETUP("attribute_test"); @@ -57,8 +59,11 @@ using namespace vespa::config::search; using proton::ImportedAttributesRepo; using proton::test::AttributeUtils; +using proton::test::MockAttributeManager; using search::TuneFileAttributes; using search::attribute::BitVectorSearchCache; +using search::attribute::DistanceMetric; +using search::attribute::HnswIndexParams; using search::attribute::IAttributeVector; using search::attribute::ImportedAttributeVector; using search::attribute::ImportedAttributeVectorFactory; @@ -67,24 +72,25 @@ using search::index::DummyFileHeaderContext; using search::index::schema::CollectionType; using search::predicate::PredicateHash; using search::predicate::PredicateIndex; +using search::tensor::DenseTensorAttribute; +using search::tensor::PrepareResult; using search::tensor::TensorAttribute; using search::test::DirectoryHandler; using std::string; -using vespalib::eval::ValueType; -using vespalib::eval::TensorSpec; -using vespalib::tensor::Tensor; -using vespalib::tensor::DefaultTensorEngine; using vespalib::ForegroundTaskExecutor; using vespalib::SequencedTaskExecutorObserver; +using vespalib::eval::TensorSpec; +using vespalib::eval::ValueType; +using vespalib::tensor::DefaultTensorEngine; +using vespalib::tensor::Tensor; -using AVConfig = search::attribute::Config; using AVBasicType = search::attribute::BasicType; using AVCollectionType = search::attribute::CollectionType; +using AVConfig = search::attribute::Config; using Int32AttributeVector = SingleValueNumericAttribute<IntegerAttributeTemplate<int32_t> >; using LidVector = LidVectorContext::LidVector; -namespace -{ +namespace { const uint64_t createSerialNum = 42u; @@ -116,45 +122,48 @@ fillAttribute(const AttributeVector::SP &attr, uint32_t from, uint32_t to, int64 const std::shared_ptr<IDestructorCallback> emptyCallback; -struct Fixture -{ +class AttributeWriterTest : public ::testing::Test { +public: DirectoryHandler _dirHandler; - DummyFileHeaderContext _fileHeaderContext; - ForegroundTaskExecutor _attributeFieldWriterReal; - SequencedTaskExecutorObserver _attributeFieldWriter; - HwInfo _hwInfo; - proton::AttributeManager::SP _m; + std::unique_ptr<ForegroundTaskExecutor> _attributeFieldWriterReal; + std::unique_ptr<SequencedTaskExecutorObserver> _attributeFieldWriter; + std::shared_ptr<MockAttributeManager> _mgr; std::unique_ptr<AttributeWriter> _aw; - Fixture(uint32_t threads) + AttributeWriterTest() : _dirHandler(test_dir), - _fileHeaderContext(), - _attributeFieldWriterReal(threads), - _attributeFieldWriter(_attributeFieldWriterReal), - _hwInfo(), - _m(std::make_shared<proton::AttributeManager> - (test_dir, "test.subdb", TuneFileAttributes(), - _fileHeaderContext, _attributeFieldWriter, _hwInfo)), + _attributeFieldWriterReal(), + _attributeFieldWriter(), + _mgr(), _aw() { - allocAttributeWriter(); + setup(1); } - Fixture() - : Fixture(1) - { + ~AttributeWriterTest(); + void setup(uint32_t threads) { + _aw.reset(); + _attributeFieldWriterReal = std::make_unique<ForegroundTaskExecutor>(threads); + _attributeFieldWriter = std::make_unique<SequencedTaskExecutorObserver>(*_attributeFieldWriterReal); + _mgr = std::make_shared<MockAttributeManager>(); + _mgr->set_writer(*_attributeFieldWriter); + allocAttributeWriter(); } - ~Fixture(); void allocAttributeWriter() { - _aw = std::make_unique<AttributeWriter>(_m); + _aw = std::make_unique<AttributeWriter>(_mgr); } AttributeVector::SP addAttribute(const vespalib::string &name) { - return addAttribute({name, AVConfig(AVBasicType::INT32)}, createSerialNum); + return addAttribute({name, AVConfig(AVBasicType::INT32)}); } - AttributeVector::SP addAttribute(const AttributeSpec &spec, SerialNum serialNum) { - auto ret = _m->addAttribute(spec, serialNum); + AttributeVector::SP addAttribute(const AttributeSpec &spec) { + auto ret = _mgr->addAttribute(spec.getName(), + AttributeFactory::createAttribute(spec.getName(), spec.getConfig())); allocAttributeWriter(); return ret; } + void add_attribute(AttributeVector::SP attr) { + _mgr->addAttribute(attr->getName(), std::move(attr)); + allocAttributeWriter(); + } void put(SerialNum serialNum, const Document &doc, DocumentIdT lid, bool immediateCommit = true) { _aw->put(serialNum, doc, lid, immediateCommit, emptyCallback); @@ -177,13 +186,13 @@ struct Fixture _aw->forceCommit(serialNum, emptyCallback); } void assertExecuteHistory(std::vector<uint32_t> expExecuteHistory) { - EXPECT_EQUAL(expExecuteHistory, _attributeFieldWriter.getExecuteHistory()); + EXPECT_EQ(expExecuteHistory, _attributeFieldWriter->getExecuteHistory()); } }; -Fixture::~Fixture() = default; +AttributeWriterTest::~AttributeWriterTest() = default; -TEST_F("require that attribute writer handles put", Fixture) +TEST_F(AttributeWriterTest, handles_put) { Schema s; s.addAttributeField(Schema::AttributeField("a1", schema::DataType::INT32, CollectionType::SINGLE)); @@ -193,108 +202,108 @@ TEST_F("require that attribute writer handles put", Fixture) DocBuilder idb(s); - AttributeVector::SP a1 = f.addAttribute("a1"); - AttributeVector::SP a2 = f.addAttribute({"a2", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); - AttributeVector::SP a3 = f.addAttribute({"a3", AVConfig(AVBasicType::FLOAT)}, createSerialNum); - AttributeVector::SP a4 = f.addAttribute({"a4", AVConfig(AVBasicType::STRING)}, createSerialNum); + auto a1 = addAttribute("a1"); + auto a2 = addAttribute({"a2", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); + auto a3 = addAttribute({"a3", AVConfig(AVBasicType::FLOAT)}); + auto a4 = addAttribute({"a4", AVConfig(AVBasicType::STRING)}); attribute::IntegerContent ibuf; attribute::FloatContent fbuf; attribute::ConstCharContent sbuf; { // empty document should give default values - EXPECT_EQUAL(1u, a1->getNumDocs()); - f.put(1, *idb.startDocument("id:ns:searchdocument::1").endDocument(), 1); - EXPECT_EQUAL(2u, a1->getNumDocs()); - EXPECT_EQUAL(2u, a2->getNumDocs()); - EXPECT_EQUAL(2u, a3->getNumDocs()); - EXPECT_EQUAL(2u, a4->getNumDocs()); - EXPECT_EQUAL(1u, a1->getStatus().getLastSyncToken()); - EXPECT_EQUAL(1u, a2->getStatus().getLastSyncToken()); - EXPECT_EQUAL(1u, a3->getStatus().getLastSyncToken()); - EXPECT_EQUAL(1u, a4->getStatus().getLastSyncToken()); + EXPECT_EQ(1u, a1->getNumDocs()); + put(1, *idb.startDocument("id:ns:searchdocument::1").endDocument(), 1); + EXPECT_EQ(2u, a1->getNumDocs()); + EXPECT_EQ(2u, a2->getNumDocs()); + EXPECT_EQ(2u, a3->getNumDocs()); + EXPECT_EQ(2u, a4->getNumDocs()); + EXPECT_EQ(1u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(1u, a2->getStatus().getLastSyncToken()); + EXPECT_EQ(1u, a3->getStatus().getLastSyncToken()); + EXPECT_EQ(1u, a4->getStatus().getLastSyncToken()); ibuf.fill(*a1, 1); - EXPECT_EQUAL(1u, ibuf.size()); + EXPECT_EQ(1u, ibuf.size()); EXPECT_TRUE(search::attribute::isUndefined<int32_t>(ibuf[0])); ibuf.fill(*a2, 1); - EXPECT_EQUAL(0u, ibuf.size()); + EXPECT_EQ(0u, ibuf.size()); fbuf.fill(*a3, 1); - EXPECT_EQUAL(1u, fbuf.size()); + EXPECT_EQ(1u, fbuf.size()); EXPECT_TRUE(search::attribute::isUndefined<float>(fbuf[0])); sbuf.fill(*a4, 1); - EXPECT_EQUAL(1u, sbuf.size()); - EXPECT_EQUAL(strcmp("", sbuf[0]), 0); + EXPECT_EQ(1u, sbuf.size()); + EXPECT_EQ(strcmp("", sbuf[0]), 0); } { // document with single value & multi value attribute - Document::UP doc = idb.startDocument("id:ns:searchdocument::2"). + auto doc = idb.startDocument("id:ns:searchdocument::2"). startAttributeField("a1").addInt(10).endField(). startAttributeField("a2").startElement().addInt(20).endElement(). startElement().addInt(30).endElement().endField().endDocument(); - f.put(2, *doc, 2); - EXPECT_EQUAL(3u, a1->getNumDocs()); - EXPECT_EQUAL(3u, a2->getNumDocs()); - EXPECT_EQUAL(2u, a1->getStatus().getLastSyncToken()); - EXPECT_EQUAL(2u, a2->getStatus().getLastSyncToken()); - EXPECT_EQUAL(2u, a3->getStatus().getLastSyncToken()); - EXPECT_EQUAL(2u, a4->getStatus().getLastSyncToken()); + put(2, *doc, 2); + EXPECT_EQ(3u, a1->getNumDocs()); + EXPECT_EQ(3u, a2->getNumDocs()); + EXPECT_EQ(2u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(2u, a2->getStatus().getLastSyncToken()); + EXPECT_EQ(2u, a3->getStatus().getLastSyncToken()); + EXPECT_EQ(2u, a4->getStatus().getLastSyncToken()); ibuf.fill(*a1, 2); - EXPECT_EQUAL(1u, ibuf.size()); - EXPECT_EQUAL(10u, ibuf[0]); + EXPECT_EQ(1u, ibuf.size()); + EXPECT_EQ(10u, ibuf[0]); ibuf.fill(*a2, 2); - EXPECT_EQUAL(2u, ibuf.size()); - EXPECT_EQUAL(20u, ibuf[0]); - EXPECT_EQUAL(30u, ibuf[1]); + EXPECT_EQ(2u, ibuf.size()); + EXPECT_EQ(20u, ibuf[0]); + EXPECT_EQ(30u, ibuf[1]); } { // replace existing document - Document::UP doc = idb.startDocument("id:ns:searchdocument::2"). + auto doc = idb.startDocument("id:ns:searchdocument::2"). startAttributeField("a1").addInt(100).endField(). startAttributeField("a2").startElement().addInt(200).endElement(). startElement().addInt(300).endElement(). startElement().addInt(400).endElement().endField().endDocument(); - f.put(3, *doc, 2); - EXPECT_EQUAL(3u, a1->getNumDocs()); - EXPECT_EQUAL(3u, a2->getNumDocs()); - EXPECT_EQUAL(3u, a1->getStatus().getLastSyncToken()); - EXPECT_EQUAL(3u, a2->getStatus().getLastSyncToken()); - EXPECT_EQUAL(3u, a3->getStatus().getLastSyncToken()); - EXPECT_EQUAL(3u, a4->getStatus().getLastSyncToken()); + put(3, *doc, 2); + EXPECT_EQ(3u, a1->getNumDocs()); + EXPECT_EQ(3u, a2->getNumDocs()); + EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(3u, a2->getStatus().getLastSyncToken()); + EXPECT_EQ(3u, a3->getStatus().getLastSyncToken()); + EXPECT_EQ(3u, a4->getStatus().getLastSyncToken()); ibuf.fill(*a1, 2); - EXPECT_EQUAL(1u, ibuf.size()); - EXPECT_EQUAL(100u, ibuf[0]); + EXPECT_EQ(1u, ibuf.size()); + EXPECT_EQ(100u, ibuf[0]); ibuf.fill(*a2, 2); - EXPECT_EQUAL(3u, ibuf.size()); - EXPECT_EQUAL(200u, ibuf[0]); - EXPECT_EQUAL(300u, ibuf[1]); - EXPECT_EQUAL(400u, ibuf[2]); + EXPECT_EQ(3u, ibuf.size()); + EXPECT_EQ(200u, ibuf[0]); + EXPECT_EQ(300u, ibuf[1]); + EXPECT_EQ(400u, ibuf[2]); } } -TEST_F("require that attribute writer handles predicate put", Fixture) +TEST_F(AttributeWriterTest, handles_predicate_put) { Schema s; s.addAttributeField(Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); DocBuilder idb(s); - AttributeVector::SP a1 = f.addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); PredicateIndex &index = static_cast<PredicateAttribute &>(*a1).getIndex(); // empty document should give default values - EXPECT_EQUAL(1u, a1->getNumDocs()); - f.put(1, *idb.startDocument("id:ns:searchdocument::1").endDocument(), 1); - EXPECT_EQUAL(2u, a1->getNumDocs()); - EXPECT_EQUAL(1u, a1->getStatus().getLastSyncToken()); - EXPECT_EQUAL(0u, index.getZeroConstraintDocs().size()); + EXPECT_EQ(1u, a1->getNumDocs()); + put(1, *idb.startDocument("id:ns:searchdocument::1").endDocument(), 1); + EXPECT_EQ(2u, a1->getNumDocs()); + EXPECT_EQ(1u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(0u, index.getZeroConstraintDocs().size()); // document with single value attribute PredicateSlimeBuilder builder; - Document::UP doc = + auto doc = idb.startDocument("id:ns:searchdocument::2").startAttributeField("a1") .addPredicate(builder.true_predicate().build()) .endField().endDocument(); - f.put(2, *doc, 2); - EXPECT_EQUAL(3u, a1->getNumDocs()); - EXPECT_EQUAL(2u, a1->getStatus().getLastSyncToken()); - EXPECT_EQUAL(1u, index.getZeroConstraintDocs().size()); + put(2, *doc, 2); + EXPECT_EQ(3u, a1->getNumDocs()); + EXPECT_EQ(2u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(1u, index.getZeroConstraintDocs().size()); auto it = index.getIntervalIndex().lookup(PredicateHash::hash64("foo=bar")); EXPECT_FALSE(it.valid()); @@ -303,9 +312,9 @@ TEST_F("require that attribute writer handles predicate put", Fixture) doc = idb.startDocument("id:ns:searchdocument::2").startAttributeField("a1") .addPredicate(builder.feature("foo").value("bar").build()) .endField().endDocument(); - f.put(3, *doc, 2); - EXPECT_EQUAL(3u, a1->getNumDocs()); - EXPECT_EQUAL(3u, a1->getStatus().getLastSyncToken()); + put(3, *doc, 2); + EXPECT_EQ(3u, a1->getNumDocs()); + EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); it = index.getIntervalIndex().lookup(PredicateHash::hash64("foo=bar")); EXPECT_TRUE(it.valid()); @@ -317,21 +326,21 @@ assertUndefined(const IAttributeVector &attr, uint32_t docId) EXPECT_TRUE(search::attribute::isUndefined<int32_t>(attr.getInt(docId))); } -TEST_F("require that attribute writer handles remove", Fixture) +TEST_F(AttributeWriterTest, handles_remove) { - AttributeVector::SP a1 = f.addAttribute("a1"); - AttributeVector::SP a2 = f.addAttribute("a2"); + auto a1 = addAttribute("a1"); + auto a2 = addAttribute("a2"); fillAttribute(a1, 1, 10, 1); fillAttribute(a2, 1, 20, 1); - f.remove(2, 0); + remove(2, 0); - TEST_DO(assertUndefined(*a1, 0)); - TEST_DO(assertUndefined(*a2, 0)); + assertUndefined(*a1, 0); + assertUndefined(*a2, 0); - f.remove(2, 0); // same sync token as previous + remove(2, 0); // same sync token as previous try { - f.remove(1, 0); // lower sync token than previous + remove(1, 0); // lower sync token than previous EXPECT_TRUE(true); // update is ignored } catch (vespalib::IllegalStateException & e) { LOG(info, "Got expected exception: '%s'", e.getMessage().c_str()); @@ -339,62 +348,63 @@ TEST_F("require that attribute writer handles remove", Fixture) } } -TEST_F("require that attribute writer handles batch remove", Fixture) +TEST_F(AttributeWriterTest, handles_batch_remove) { - AttributeVector::SP a1 = f.addAttribute("a1"); - AttributeVector::SP a2 = f.addAttribute("a2"); + auto a1 = addAttribute("a1"); + auto a2 = addAttribute("a2"); fillAttribute(a1, 4, 22, 1); fillAttribute(a2, 4, 33, 1); LidVector lidsToRemove = {1,3}; - f.remove(lidsToRemove, 2); - - TEST_DO(assertUndefined(*a1, 1)); - EXPECT_EQUAL(22, a1->getInt(2)); - TEST_DO(assertUndefined(*a1, 3)); - TEST_DO(assertUndefined(*a2, 1)); - EXPECT_EQUAL(33, a2->getInt(2)); - TEST_DO(assertUndefined(*a2, 3)); + remove(lidsToRemove, 2); + + assertUndefined(*a1, 1); + EXPECT_EQ(22, a1->getInt(2)); + assertUndefined(*a1, 3); + assertUndefined(*a2, 1); + EXPECT_EQ(33, a2->getInt(2)); + assertUndefined(*a2, 3); } -void verifyAttributeContent(const AttributeVector & v, uint32_t lid, vespalib::stringref expected) +void +verifyAttributeContent(const AttributeVector & v, uint32_t lid, vespalib::stringref expected) { attribute::ConstCharContent sbuf; sbuf.fill(v, lid); - EXPECT_EQUAL(1u, sbuf.size()); - EXPECT_EQUAL(expected, sbuf[0]); + EXPECT_EQ(1u, sbuf.size()); + EXPECT_EQ(expected, sbuf[0]); } -TEST_F("require that visibilitydelay is honoured", Fixture) +TEST_F(AttributeWriterTest, visibility_delay_is_honoured) { - AttributeVector::SP a1 = f.addAttribute({"a1", AVConfig(AVBasicType::STRING)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::STRING)}); Schema s; s.addAttributeField(Schema::AttributeField("a1", schema::DataType::STRING, CollectionType::SINGLE)); DocBuilder idb(s); - EXPECT_EQUAL(1u, a1->getNumDocs()); - EXPECT_EQUAL(0u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(1u, a1->getNumDocs()); + EXPECT_EQ(0u, a1->getStatus().getLastSyncToken()); Document::UP doc = idb.startDocument("id:ns:searchdocument::1") .startAttributeField("a1").addStr("10").endField() .endDocument(); - f.put(3, *doc, 1); - EXPECT_EQUAL(2u, a1->getNumDocs()); - EXPECT_EQUAL(3u, a1->getStatus().getLastSyncToken()); - AttributeWriter awDelayed(f._m); + put(3, *doc, 1); + EXPECT_EQ(2u, a1->getNumDocs()); + EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); + AttributeWriter awDelayed(_mgr); awDelayed.put(4, *doc, 2, false, emptyCallback); - EXPECT_EQUAL(3u, a1->getNumDocs()); - EXPECT_EQUAL(3u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(3u, a1->getNumDocs()); + EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); awDelayed.put(5, *doc, 4, false, emptyCallback); - EXPECT_EQUAL(5u, a1->getNumDocs()); - EXPECT_EQUAL(3u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(5u, a1->getNumDocs()); + EXPECT_EQ(3u, a1->getStatus().getLastSyncToken()); awDelayed.forceCommit(6, emptyCallback); - EXPECT_EQUAL(6u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(6u, a1->getStatus().getLastSyncToken()); - AttributeWriter awDelayedShort(f._m); + AttributeWriter awDelayedShort(_mgr); awDelayedShort.put(7, *doc, 2, false, emptyCallback); - EXPECT_EQUAL(6u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(6u, a1->getStatus().getLastSyncToken()); awDelayedShort.put(8, *doc, 2, false, emptyCallback); awDelayedShort.forceCommit(8, emptyCallback); - EXPECT_EQUAL(8u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(8u, a1->getStatus().getLastSyncToken()); verifyAttributeContent(*a1, 2, "10"); awDelayed.put(9, *idb.startDocument("id:ns:searchdocument::1").startAttributeField("a1").addStr("11").endField().endDocument(), @@ -403,40 +413,39 @@ TEST_F("require that visibilitydelay is honoured", Fixture) 2, false, emptyCallback); awDelayed.put(11, *idb.startDocument("id:ns:searchdocument::1").startAttributeField("a1").addStr("30").endField().endDocument(), 2, false, emptyCallback); - EXPECT_EQUAL(8u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(8u, a1->getStatus().getLastSyncToken()); verifyAttributeContent(*a1, 2, "10"); awDelayed.forceCommit(12, emptyCallback); - EXPECT_EQUAL(12u, a1->getStatus().getLastSyncToken()); + EXPECT_EQ(12u, a1->getStatus().getLastSyncToken()); verifyAttributeContent(*a1, 2, "30"); - } -TEST_F("require that attribute writer handles predicate remove", Fixture) +TEST_F(AttributeWriterTest, handles_predicate_remove) { - AttributeVector::SP a1 = f.addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); Schema s; s.addAttributeField( Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); DocBuilder idb(s); PredicateSlimeBuilder builder; - Document::UP doc = + auto doc = idb.startDocument("id:ns:searchdocument::1").startAttributeField("a1") .addPredicate(builder.true_predicate().build()) .endField().endDocument(); - f.put(1, *doc, 1); - EXPECT_EQUAL(2u, a1->getNumDocs()); + put(1, *doc, 1); + EXPECT_EQ(2u, a1->getNumDocs()); PredicateIndex &index = static_cast<PredicateAttribute &>(*a1).getIndex(); - EXPECT_EQUAL(1u, index.getZeroConstraintDocs().size()); - f.remove(2, 1); - EXPECT_EQUAL(0u, index.getZeroConstraintDocs().size()); + EXPECT_EQ(1u, index.getZeroConstraintDocs().size()); + remove(2, 1); + EXPECT_EQ(0u, index.getZeroConstraintDocs().size()); } -TEST_F("require that attribute writer handles update", Fixture) +TEST_F(AttributeWriterTest, handles_update) { - AttributeVector::SP a1 = f.addAttribute("a1"); - AttributeVector::SP a2 = f.addAttribute("a2"); + auto a1 = addAttribute("a1"); + auto a2 = addAttribute("a2"); fillAttribute(a1, 1, 10, 1); fillAttribute(a2, 1, 20, 1); @@ -454,19 +463,19 @@ TEST_F("require that attribute writer handles update", Fixture) DummyFieldUpdateCallback onUpdate; bool immediateCommit = true; - f.update(2, upd, 1, immediateCommit, onUpdate); + update(2, upd, 1, immediateCommit, onUpdate); attribute::IntegerContent ibuf; ibuf.fill(*a1, 1); - EXPECT_EQUAL(1u, ibuf.size()); - EXPECT_EQUAL(15u, ibuf[0]); + EXPECT_EQ(1u, ibuf.size()); + EXPECT_EQ(15u, ibuf[0]); ibuf.fill(*a2, 1); - EXPECT_EQUAL(1u, ibuf.size()); - EXPECT_EQUAL(30u, ibuf[0]); + EXPECT_EQ(1u, ibuf.size()); + EXPECT_EQ(30u, ibuf[0]); - f.update(2, upd, 1, immediateCommit, onUpdate); // same sync token as previous + update(2, upd, 1, immediateCommit, onUpdate); // same sync token as previous try { - f.update(1, upd, 1, immediateCommit, onUpdate); // lower sync token than previous + update(1, upd, 1, immediateCommit, onUpdate); // lower sync token than previous EXPECT_TRUE(true); // update is ignored } catch (vespalib::IllegalStateException & e) { LOG(info, "Got expected exception: '%s'", e.getMessage().c_str()); @@ -474,20 +483,20 @@ TEST_F("require that attribute writer handles update", Fixture) } } -TEST_F("require that attribute writer handles predicate update", Fixture) +TEST_F(AttributeWriterTest, handles_predicate_update) { - AttributeVector::SP a1 = f.addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}, createSerialNum); + auto a1 = addAttribute({"a1", AVConfig(AVBasicType::PREDICATE)}); Schema schema; schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::BOOLEANTREE, CollectionType::SINGLE)); DocBuilder idb(schema); PredicateSlimeBuilder builder; - Document::UP doc = + auto doc = idb.startDocument("id:ns:searchdocument::1").startAttributeField("a1") .addPredicate(builder.true_predicate().build()) .endField().endDocument(); - f.put(1, *doc, 1); - EXPECT_EQUAL(2u, a1->getNumDocs()); + put(1, *doc, 1); + EXPECT_EQ(2u, a1->getNumDocs()); const document::DocumentType &dt(idb.getDocumentType()); DocumentUpdate upd(*idb.getDocumentTypeRepo(), dt, DocumentId("id:ns:searchdocument::1")); @@ -496,20 +505,20 @@ TEST_F("require that attribute writer handles predicate update", Fixture) .addUpdate(AssignValueUpdate(new_value))); PredicateIndex &index = static_cast<PredicateAttribute &>(*a1).getIndex(); - EXPECT_EQUAL(1u, index.getZeroConstraintDocs().size()); + EXPECT_EQ(1u, index.getZeroConstraintDocs().size()); EXPECT_FALSE(index.getIntervalIndex().lookup(PredicateHash::hash64("foo=bar")).valid()); bool immediateCommit = true; DummyFieldUpdateCallback onUpdate; - f.update(2, upd, 1, immediateCommit, onUpdate); - EXPECT_EQUAL(0u, index.getZeroConstraintDocs().size()); + update(2, upd, 1, immediateCommit, onUpdate); + EXPECT_EQ(0u, index.getZeroConstraintDocs().size()); EXPECT_TRUE(index.getIntervalIndex().lookup(PredicateHash::hash64("foo=bar")).valid()); } -struct AttributeCollectionSpecFixture -{ +class AttributeCollectionSpecTest : public ::testing::Test { +public: AttributesConfigBuilder _builder; AttributeCollectionSpecFactory _factory; - AttributeCollectionSpecFixture(bool fastAccessOnly) + AttributeCollectionSpecTest(bool fastAccessOnly) : _builder(), _factory(search::GrowStrategy(), 100, fastAccessOnly) { @@ -528,49 +537,47 @@ struct AttributeCollectionSpecFixture } }; -struct NormalAttributeCollectionSpecFixture : public AttributeCollectionSpecFixture -{ - NormalAttributeCollectionSpecFixture() : AttributeCollectionSpecFixture(false) {} +class NormalAttributeCollectionSpecTest : public AttributeCollectionSpecTest { +public: + NormalAttributeCollectionSpecTest() : AttributeCollectionSpecTest(false) {} }; -struct FastAccessAttributeCollectionSpecFixture : public AttributeCollectionSpecFixture +struct FastAccessAttributeCollectionSpecTest : public AttributeCollectionSpecTest { - FastAccessAttributeCollectionSpecFixture() : AttributeCollectionSpecFixture(true) {} + FastAccessAttributeCollectionSpecTest() : AttributeCollectionSpecTest(true) {} }; -TEST_F("require that normal attribute collection spec can be created", - NormalAttributeCollectionSpecFixture) +TEST_F(NormalAttributeCollectionSpecTest, spec_can_be_created) { - AttributeCollectionSpec::UP spec = f.create(10, 20); - EXPECT_EQUAL(2u, spec->getAttributes().size()); - EXPECT_EQUAL("a1", spec->getAttributes()[0].getName()); - EXPECT_EQUAL("a2", spec->getAttributes()[1].getName()); - EXPECT_EQUAL(10u, spec->getDocIdLimit()); - EXPECT_EQUAL(20u, spec->getCurrentSerialNum()); + AttributeCollectionSpec::UP spec = create(10, 20); + EXPECT_EQ(2u, spec->getAttributes().size()); + EXPECT_EQ("a1", spec->getAttributes()[0].getName()); + EXPECT_EQ("a2", spec->getAttributes()[1].getName()); + EXPECT_EQ(10u, spec->getDocIdLimit()); + EXPECT_EQ(20u, spec->getCurrentSerialNum()); } -TEST_F("require that fast access attribute collection spec can be created", - FastAccessAttributeCollectionSpecFixture) +TEST_F(FastAccessAttributeCollectionSpecTest, spec_can_be_created) { - AttributeCollectionSpec::UP spec = f.create(10, 20); - EXPECT_EQUAL(1u, spec->getAttributes().size()); - EXPECT_EQUAL("a2", spec->getAttributes()[0].getName()); - EXPECT_EQUAL(10u, spec->getDocIdLimit()); - EXPECT_EQUAL(20u, spec->getCurrentSerialNum()); + AttributeCollectionSpec::UP spec = create(10, 20); + EXPECT_EQ(1u, spec->getAttributes().size()); + EXPECT_EQ("a2", spec->getAttributes()[0].getName()); + EXPECT_EQ(10u, spec->getDocIdLimit()); + EXPECT_EQ(20u, spec->getCurrentSerialNum()); } const FilterAttributeManager::AttributeSet ACCEPTED_ATTRIBUTES = {"a2"}; -struct FilterFixture -{ +class FilterAttributeManagerTest : public ::testing::Test { +public: DirectoryHandler _dirHandler; DummyFileHeaderContext _fileHeaderContext; ForegroundTaskExecutor _attributeFieldWriter; HwInfo _hwInfo; - proton::AttributeManager::SP _baseMgr; FilterAttributeManager _filterMgr; - FilterFixture() + + FilterAttributeManagerTest() : _dirHandler(test_dir), _fileHeaderContext(), _attributeFieldWriter(), @@ -587,34 +594,34 @@ struct FilterFixture } }; -TEST_F("require that filter attribute manager can filter attributes", FilterFixture) +TEST_F(FilterAttributeManagerTest, filter_attributes) { - EXPECT_TRUE(f._filterMgr.getAttribute("a1").get() == nullptr); - EXPECT_TRUE(f._filterMgr.getAttribute("a2").get() != nullptr); + EXPECT_TRUE(_filterMgr.getAttribute("a1").get() == nullptr); + EXPECT_TRUE(_filterMgr.getAttribute("a2").get() != nullptr); std::vector<AttributeGuard> attrs; - f._filterMgr.getAttributeList(attrs); - EXPECT_EQUAL(1u, attrs.size()); - EXPECT_EQUAL("a2", attrs[0]->getName()); - searchcorespi::IFlushTarget::List targets = f._filterMgr.getFlushTargets(); - EXPECT_EQUAL(2u, targets.size()); - EXPECT_EQUAL("attribute.flush.a2", targets[0]->getName()); - EXPECT_EQUAL("attribute.shrink.a2", targets[1]->getName()); + _filterMgr.getAttributeList(attrs); + EXPECT_EQ(1u, attrs.size()); + EXPECT_EQ("a2", attrs[0]->getName()); + searchcorespi::IFlushTarget::List targets = _filterMgr.getFlushTargets(); + EXPECT_EQ(2u, targets.size()); + EXPECT_EQ("attribute.flush.a2", targets[0]->getName()); + EXPECT_EQ("attribute.shrink.a2", targets[1]->getName()); } -TEST_F("require that filter attribute manager can return flushed serial number", FilterFixture) +TEST_F(FilterAttributeManagerTest, returns_flushed_serial_number) { - f._baseMgr->flushAll(100); - EXPECT_EQUAL(0u, f._filterMgr.getFlushedSerialNum("a1")); - EXPECT_EQUAL(100u, f._filterMgr.getFlushedSerialNum("a2")); + _baseMgr->flushAll(100); + EXPECT_EQ(0u, _filterMgr.getFlushedSerialNum("a1")); + EXPECT_EQ(100u, _filterMgr.getFlushedSerialNum("a2")); } -TEST_F("readable_attribute_vector filters attributes", FilterFixture) +TEST_F(FilterAttributeManagerTest, readable_attribute_vector_filters_attributes) { - auto av = f._filterMgr.readable_attribute_vector("a2"); + auto av = _filterMgr.readable_attribute_vector("a2"); ASSERT_TRUE(av); - EXPECT_EQUAL("a2", av->makeReadGuard(false)->attribute()->getName()); + EXPECT_EQ("a2", av->makeReadGuard(false)->attribute()->getName()); - av = f._filterMgr.readable_attribute_vector("a1"); + av = _filterMgr.readable_attribute_vector("a1"); EXPECT_FALSE(av); } @@ -625,18 +632,20 @@ Tensor::UP make_tensor(const TensorSpec &spec) { return Tensor::UP(dynamic_cast<Tensor*>(tensor.release())); } +const vespalib::string sparse_tensor = "tensor(x{},y{})"; + AttributeVector::SP -createTensorAttribute(Fixture &f) { +createTensorAttribute(AttributeWriterTest &t) { AVConfig cfg(AVBasicType::TENSOR); - cfg.setTensorType(ValueType::from_spec("tensor(x{},y{})")); - auto ret = f.addAttribute({"a1", cfg}, createSerialNum); + cfg.setTensorType(ValueType::from_spec(sparse_tensor)); + auto ret = t.addAttribute({"a1", cfg}); return ret; } Schema -createTensorSchema() { +createTensorSchema(const vespalib::string& tensor_spec = sparse_tensor) { Schema schema; - schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::TENSOR, CollectionType::SINGLE)); + schema.addAttributeField(Schema::AttributeField("a1", schema::DataType::TENSOR, CollectionType::SINGLE, tensor_spec)); return schema; } @@ -649,38 +658,34 @@ createTensorPutDoc(DocBuilder &builder, const Tensor &tensor) { } - -TEST_F("Test that we can use attribute writer to write to tensor attribute", - Fixture) +TEST_F(AttributeWriterTest, can_write_to_tensor_attribute) { - AttributeVector::SP a1 = createTensorAttribute(f); + auto a1 = createTensorAttribute(*this); Schema s = createTensorSchema(); DocBuilder builder(s); - auto tensor = make_tensor(TensorSpec("tensor(x{},y{})") + auto tensor = make_tensor(TensorSpec(sparse_tensor) .add({{"x", "4"}, {"y", "5"}}, 7)); Document::UP doc = createTensorPutDoc(builder, *tensor); - f.put(1, *doc, 1); - EXPECT_EQUAL(2u, a1->getNumDocs()); - TensorAttribute *tensorAttribute = - dynamic_cast<TensorAttribute *>(a1.get()); + put(1, *doc, 1); + EXPECT_EQ(2u, a1->getNumDocs()); + auto *tensorAttribute = dynamic_cast<TensorAttribute *>(a1.get()); EXPECT_TRUE(tensorAttribute != nullptr); auto tensor2 = tensorAttribute->getTensor(1); EXPECT_TRUE(static_cast<bool>(tensor2)); EXPECT_TRUE(tensor->equals(*tensor2)); } -TEST_F("require that attribute writer handles tensor assign update", Fixture) +TEST_F(AttributeWriterTest, handles_tensor_assign_update) { - AttributeVector::SP a1 = createTensorAttribute(f); + auto a1 = createTensorAttribute(*this); Schema s = createTensorSchema(); DocBuilder builder(s); - auto tensor = make_tensor(TensorSpec("tensor(x{},y{})") + auto tensor = make_tensor(TensorSpec(sparse_tensor) .add({{"x", "6"}, {"y", "7"}}, 9)); - Document::UP doc = createTensorPutDoc(builder, *tensor); - f.put(1, *doc, 1); - EXPECT_EQUAL(2u, a1->getNumDocs()); - TensorAttribute *tensorAttribute = - dynamic_cast<TensorAttribute *>(a1.get()); + auto doc = createTensorPutDoc(builder, *tensor); + put(1, *doc, 1); + EXPECT_EQ(2u, a1->getNumDocs()); + auto *tensorAttribute = dynamic_cast<TensorAttribute *>(a1.get()); EXPECT_TRUE(tensorAttribute != nullptr); auto tensor2 = tensorAttribute->getTensor(1); EXPECT_TRUE(static_cast<bool>(tensor2)); @@ -688,23 +693,22 @@ TEST_F("require that attribute writer handles tensor assign update", Fixture) const document::DocumentType &dt(builder.getDocumentType()); DocumentUpdate upd(*builder.getDocumentTypeRepo(), dt, DocumentId("id:ns:searchdocument::1")); - auto new_tensor = make_tensor(TensorSpec("tensor(x{},y{})") + auto new_tensor = make_tensor(TensorSpec(sparse_tensor) .add({{"x", "8"}, {"y", "9"}}, 11)); - TensorDataType xySparseTensorDataType(vespalib::eval::ValueType::from_spec("tensor(x{},y{})")); + TensorDataType xySparseTensorDataType(vespalib::eval::ValueType::from_spec(sparse_tensor)); TensorFieldValue new_value(xySparseTensorDataType); new_value = new_tensor->clone(); upd.addUpdate(FieldUpdate(upd.getType().getField("a1")) .addUpdate(AssignValueUpdate(new_value))); bool immediateCommit = true; DummyFieldUpdateCallback onUpdate; - f.update(2, upd, 1, immediateCommit, onUpdate); - EXPECT_EQUAL(2u, a1->getNumDocs()); + update(2, upd, 1, immediateCommit, onUpdate); + EXPECT_EQ(2u, a1->getNumDocs()); EXPECT_TRUE(tensorAttribute != nullptr); tensor2 = tensorAttribute->getTensor(1); EXPECT_TRUE(static_cast<bool>(tensor2)); EXPECT_TRUE(!tensor->equals(*tensor2)); EXPECT_TRUE(new_tensor->equals(*tensor2)); - } namespace { @@ -712,16 +716,16 @@ namespace { void assertPutDone(AttributeVector &attr, int32_t expVal) { - EXPECT_EQUAL(2u, attr.getNumDocs()); - EXPECT_EQUAL(1u, attr.getStatus().getLastSyncToken()); + EXPECT_EQ(2u, attr.getNumDocs()); + EXPECT_EQ(1u, attr.getStatus().getLastSyncToken()); attribute::IntegerContent ibuf; ibuf.fill(attr, 1); - EXPECT_EQUAL(1u, ibuf.size()); - EXPECT_EQUAL(expVal, ibuf[0]); + EXPECT_EQ(1u, ibuf.size()); + EXPECT_EQ(expVal, ibuf[0]); } void -putAttributes(Fixture &f, std::vector<uint32_t> expExecuteHistory) +putAttributes(AttributeWriterTest &t, std::vector<uint32_t> expExecuteHistory) { Schema s; s.addAttributeField(Schema::AttributeField("a1", schema::DataType::INT32, CollectionType::SINGLE)); @@ -730,41 +734,205 @@ putAttributes(Fixture &f, std::vector<uint32_t> expExecuteHistory) DocBuilder idb(s); - AttributeVector::SP a1 = f.addAttribute("a1"); - AttributeVector::SP a2 = f.addAttribute("a2"); - AttributeVector::SP a3 = f.addAttribute("a3"); + auto a1 = t.addAttribute("a1"); + auto a2 = t.addAttribute("a2"); + auto a3 = t.addAttribute("a3"); - EXPECT_EQUAL(1u, a1->getNumDocs()); - EXPECT_EQUAL(1u, a2->getNumDocs()); - EXPECT_EQUAL(1u, a3->getNumDocs()); - f.put(1, *idb.startDocument("id:ns:searchdocument::1"). + EXPECT_EQ(1u, a1->getNumDocs()); + EXPECT_EQ(1u, a2->getNumDocs()); + EXPECT_EQ(1u, a3->getNumDocs()); + t.put(1, *idb.startDocument("id:ns:searchdocument::1"). startAttributeField("a1").addInt(10).endField(). startAttributeField("a2").addInt(15).endField(). startAttributeField("a3").addInt(20).endField(). endDocument(), 1); - TEST_DO(assertPutDone(*a1, 10)); - TEST_DO(assertPutDone(*a2, 15)); - TEST_DO(assertPutDone(*a3, 20)); - TEST_DO(f.assertExecuteHistory(expExecuteHistory)); + assertPutDone(*a1, 10); + assertPutDone(*a2, 15); + assertPutDone(*a3, 20); + t.assertExecuteHistory(expExecuteHistory); +} + +} + +TEST_F(AttributeWriterTest, spreads_write_over_1_write_context) +{ + putAttributes(*this, {0}); } +TEST_F(AttributeWriterTest, spreads_write_over_2_write_contexts) +{ + setup(2); + putAttributes(*this, {0, 1}); } -TEST_F("require that attribute writer spreads write over 1 write context", Fixture(1)) +TEST_F(AttributeWriterTest, spreads_write_over_3_write_contexts) { - TEST_DO(putAttributes(f, {0})); + setup(8); + putAttributes(*this, {0, 1, 2}); } -TEST_F("require that attribute writer spreads write over 2 write contexts", Fixture(2)) +struct MockPrepareResult : public PrepareResult { + uint32_t docid; + const Tensor& tensor; + MockPrepareResult(uint32_t docid_in, const Tensor& tensor_in) : docid(docid_in), tensor(tensor_in) {} +}; + +class MockDenseTensorAttribute : public DenseTensorAttribute { +public: + mutable size_t prepare_set_tensor_cnt; + mutable size_t complete_set_tensor_cnt; + size_t clear_doc_cnt; + + MockDenseTensorAttribute(vespalib::stringref name, const AVConfig& cfg) + : DenseTensorAttribute(name, cfg), + prepare_set_tensor_cnt(0), + complete_set_tensor_cnt(0), + clear_doc_cnt(0) + {} + uint32_t clearDoc(DocId docid) override { + ++clear_doc_cnt; + return DenseTensorAttribute::clearDoc(docid); + } + std::unique_ptr<PrepareResult> prepare_set_tensor(uint32_t docid, const Tensor& tensor) const override { + ++prepare_set_tensor_cnt; + return std::make_unique<MockPrepareResult>(docid, tensor); + } + + virtual void complete_set_tensor(DocId docid, const Tensor& tensor, std::unique_ptr<PrepareResult> prepare_result) override { + ++complete_set_tensor_cnt; + assert(prepare_result); + auto* mock_result = dynamic_cast<MockPrepareResult*>(prepare_result.get()); + assert(mock_result); + EXPECT_EQ(docid, mock_result->docid); + EXPECT_EQ(tensor, mock_result->tensor); + } +}; + +const vespalib::string dense_tensor = "tensor(x[2])"; + +AVConfig +get_tensor_config(bool allow_multi_threaded_indexing) { - TEST_DO(putAttributes(f, {0, 1})); + AVConfig cfg(AVBasicType::TENSOR); + cfg.setTensorType(ValueType::from_spec(dense_tensor)); + cfg.set_hnsw_index_params(HnswIndexParams(4, 4, DistanceMetric::Euclidean, allow_multi_threaded_indexing)); + return cfg; } -TEST_F("require that attribute writer spreads write over 3 write contexts", Fixture(8)) +std::shared_ptr<MockDenseTensorAttribute> +make_mock_tensor_attribute(const vespalib::string& name, bool allow_multi_threaded_indexing) { - TEST_DO(putAttributes(f, {0, 1, 2})); + auto cfg = get_tensor_config(allow_multi_threaded_indexing); + return std::make_shared<MockDenseTensorAttribute>(name, cfg); } +TEST_F(AttributeWriterTest, tensor_attributes_using_two_phase_put_are_in_separate_write_contexts) +{ + addAttribute("a1"); + addAttribute({"t1", get_tensor_config(true)}); + addAttribute({"t2", get_tensor_config(true)}); + addAttribute({"t3", get_tensor_config(false)}); + allocAttributeWriter(); + + const auto& ctx = _aw->get_write_contexts(); + EXPECT_EQ(3, ctx.size()); + EXPECT_FALSE(ctx[0].use_two_phase_put()); + EXPECT_EQ(2, ctx[0].getFields().size()); + + EXPECT_TRUE(ctx[1].use_two_phase_put()); + EXPECT_EQ(1, ctx[1].getFields().size()); + EXPECT_EQ("t1", ctx[1].getFields()[0].getAttribute().getName()); + + EXPECT_TRUE(ctx[2].use_two_phase_put()); + EXPECT_EQ(1, ctx[2].getFields().size()); + EXPECT_EQ("t2", ctx[2].getFields()[0].getAttribute().getName()); +} + +class TwoPhasePutTest : public AttributeWriterTest { +public: + Schema schema; + DocBuilder builder; + std::shared_ptr<MockDenseTensorAttribute> attr; + std::unique_ptr<Tensor> tensor; + + TwoPhasePutTest() + : AttributeWriterTest(), + schema(createTensorSchema(dense_tensor)), + builder(schema), + attr() + { + setup(2); + attr = make_mock_tensor_attribute("a1", true); + add_attribute(attr); + AttributeManager::padAttribute(*attr, 4); + attr->clear_doc_cnt = 0; + tensor = make_tensor(TensorSpec(dense_tensor) + .add({{"x", 0}}, 3).add({{"x", 1}}, 5)); + } + void expect_tensor_attr_calls(size_t exp_prepare_cnt, + size_t exp_complete_cnt, + size_t exp_clear_doc_cnt = 0) { + EXPECT_EQ(exp_prepare_cnt, attr->prepare_set_tensor_cnt); + EXPECT_EQ(exp_complete_cnt, attr->complete_set_tensor_cnt); + EXPECT_EQ(exp_clear_doc_cnt, attr->clear_doc_cnt); + } + Document::UP make_doc() { + return createTensorPutDoc(builder, *tensor); + } + Document::UP make_no_field_doc() { + return builder.startDocument("id:ns:searchdocument::1").endDocument(); + } + Document::UP make_no_tensor_doc() { + return builder.startDocument("id:ns:searchdocument::1"). + startAttributeField("a1"). + addTensor(std::unique_ptr<vespalib::tensor::Tensor>()).endField().endDocument(); + } +}; + +TEST_F(TwoPhasePutTest, handles_put_in_two_phases_when_specified_for_tensor_attribute) +{ + auto doc = make_doc(); + + put(1, *doc, 1); + expect_tensor_attr_calls(1, 1); + assertExecuteHistory({1, 0}); + + put(2, *doc, 2); + expect_tensor_attr_calls(2, 2); + assertExecuteHistory({1, 0, 0, 0}); + + put(3, *doc, 3); + expect_tensor_attr_calls(3, 3); + // Note that the prepare step is executed round-robin between the 2 threads. + assertExecuteHistory({1, 0, 0, 0, 1, 0}); +} + +TEST_F(TwoPhasePutTest, put_is_ignored_when_serial_number_is_older_or_equal_to_attribute) +{ + auto doc = make_doc(); + attr->commit(7, 7); + put(7, *doc, 1); + expect_tensor_attr_calls(0, 0); + assertExecuteHistory({1, 0}); +} + +TEST_F(TwoPhasePutTest, document_is_cleared_if_field_is_not_set) +{ + auto doc = make_no_field_doc(); + put(1, *doc, 1); + expect_tensor_attr_calls(0, 0, 1); + assertExecuteHistory({1, 0}); +} + +TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set) +{ + auto doc = make_no_tensor_doc(); + put(1, *doc, 1); + expect_tensor_attr_calls(0, 0, 1); + assertExecuteHistory({1, 0}); +} + + ImportedAttributeVector::SP createImportedAttribute(const vespalib::string &name) { @@ -787,74 +955,66 @@ createImportedAttributesRepo() return result; } -TEST_F("require that AttributeWriter::forceCommit() clears search cache in imported attribute vectors", Fixture) +TEST_F(AttributeWriterTest, forceCommit_clears_search_cache_in_imported_attribute_vectors) { - f._m->setImportedAttributes(createImportedAttributesRepo()); - f.commit(10); - EXPECT_EQUAL(0u, f._m->getImportedAttributes()->get("imported_a")->getSearchCache()->size()); - EXPECT_EQUAL(0u, f._m->getImportedAttributes()->get("imported_b")->getSearchCache()->size()); + _mgr->setImportedAttributes(createImportedAttributesRepo()); + commit(10); + EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_a")->getSearchCache()->size()); + EXPECT_EQ(0u, _mgr->getImportedAttributes()->get("imported_b")->getSearchCache()->size()); } -struct StructFixtureBase : public Fixture -{ +class StructWriterTestBase : public AttributeWriterTest { +public: DocumentType _type; const Field _valueField; StructDataType _structFieldType; - StructFixtureBase() - : Fixture(), + StructWriterTestBase() + : AttributeWriterTest(), _type("test"), _valueField("value", 2, *DataType::INT, true), _structFieldType("struct") { - addAttribute({"value", AVConfig(AVBasicType::INT32, AVCollectionType::SINGLE)}, createSerialNum); + addAttribute({"value", AVConfig(AVBasicType::INT32, AVCollectionType::SINGLE)}); _type.addField(_valueField); _structFieldType.addField(_valueField); } - ~StructFixtureBase(); + ~StructWriterTestBase(); - std::unique_ptr<StructFieldValue> - makeStruct() - { + std::unique_ptr<StructFieldValue> makeStruct() { return std::make_unique<StructFieldValue>(_structFieldType); } - std::unique_ptr<StructFieldValue> - makeStruct(const int32_t value) - { + std::unique_ptr<StructFieldValue> makeStruct(const int32_t value) { auto ret = makeStruct(); ret->setValue(_valueField, IntFieldValue(value)); return ret; } - std::unique_ptr<Document> - makeDoc() - { + std::unique_ptr<Document> makeDoc() { return std::make_unique<Document>(_type, DocumentId("id::test::1")); } }; -StructFixtureBase::~StructFixtureBase() = default; +StructWriterTestBase::~StructWriterTestBase() = default; -struct StructArrayFixture : public StructFixtureBase -{ - using StructFixtureBase::makeDoc; +class StructArrayWriterTest : public StructWriterTestBase { +public: + using StructWriterTestBase::makeDoc; const ArrayDataType _structArrayFieldType; const Field _structArrayField; - StructArrayFixture() - : StructFixtureBase(), + StructArrayWriterTest() + : StructWriterTestBase(), _structArrayFieldType(_structFieldType), _structArrayField("array", _structArrayFieldType, true) { - addAttribute({"array.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); + addAttribute({"array.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); _type.addField(_structArrayField); } - ~StructArrayFixture(); + ~StructArrayWriterTest(); - std::unique_ptr<Document> - makeDoc(int32_t value, const std::vector<int32_t> &arrayValues) - { + std::unique_ptr<Document> makeDoc(int32_t value, const std::vector<int32_t> &arrayValues) { auto doc = makeDoc(); doc->setValue(_valueField, IntFieldValue(value)); ArrayFieldValue s(_structArrayFieldType); @@ -865,49 +1025,47 @@ struct StructArrayFixture : public StructFixtureBase return doc; } void checkAttrs(uint32_t lid, int32_t value, const std::vector<int32_t> &arrayValues) { - auto valueAttr = _m->getAttribute("value")->getSP(); - auto arrayValueAttr = _m->getAttribute("array.value")->getSP(); - EXPECT_EQUAL(value, valueAttr->getInt(lid)); + auto valueAttr = _mgr->getAttribute("value")->getSP(); + auto arrayValueAttr = _mgr->getAttribute("array.value")->getSP(); + EXPECT_EQ(value, valueAttr->getInt(lid)); attribute::IntegerContent ibuf; ibuf.fill(*arrayValueAttr, lid); - EXPECT_EQUAL(arrayValues.size(), ibuf.size()); + EXPECT_EQ(arrayValues.size(), ibuf.size()); for (size_t i = 0; i < arrayValues.size(); ++i) { - EXPECT_EQUAL(arrayValues[i], ibuf[i]); + EXPECT_EQ(arrayValues[i], ibuf[i]); } } }; -StructArrayFixture::~StructArrayFixture() = default; +StructArrayWriterTest::~StructArrayWriterTest() = default; -TEST_F("require that update with doc argument updates struct field attributes (array)", StructArrayFixture) +TEST_F(StructArrayWriterTest, update_with_doc_argument_updates_struct_field_attributes) { - auto doc = f.makeDoc(10, {11, 12}); - f.put(10, *doc, 1); - TEST_DO(f.checkAttrs(1, 10, {11, 12})); - doc = f.makeDoc(20, {21}); - f.update(11, *doc, 1, true); - TEST_DO(f.checkAttrs(1, 10, {21})); + auto doc = makeDoc(10, {11, 12}); + put(10, *doc, 1); + checkAttrs(1, 10, {11, 12}); + doc = makeDoc(20, {21}); + update(11, *doc, 1, true); + checkAttrs(1, 10, {21}); } -struct StructMapFixture : public StructFixtureBase -{ - using StructFixtureBase::makeDoc; +class StructMapWriterTest : public StructWriterTestBase { +public: + using StructWriterTestBase::makeDoc; const MapDataType _structMapFieldType; const Field _structMapField; - StructMapFixture() - : StructFixtureBase(), + StructMapWriterTest() + : StructWriterTestBase(), _structMapFieldType(*DataType::INT, _structFieldType), _structMapField("map", _structMapFieldType, true) { - addAttribute({"map.value.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); - addAttribute({"map.key", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}, createSerialNum); + addAttribute({"map.value.value", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); + addAttribute({"map.key", AVConfig(AVBasicType::INT32, AVCollectionType::ARRAY)}); _type.addField(_structMapField); } - std::unique_ptr<Document> - makeDoc(int32_t value, const std::map<int32_t, int32_t> &mapValues) - { + std::unique_ptr<Document> makeDoc(int32_t value, const std::map<int32_t, int32_t> &mapValues) { auto doc = makeDoc(); doc->setValue(_valueField, IntFieldValue(value)); MapFieldValue s(_structMapFieldType); @@ -917,38 +1075,35 @@ struct StructMapFixture : public StructFixtureBase doc->setValue(_structMapField, s); return doc; } + void checkAttrs(uint32_t lid, int32_t expValue, const std::map<int32_t, int32_t> &expMap) { - auto valueAttr = _m->getAttribute("value")->getSP(); - auto mapKeyAttr = _m->getAttribute("map.key")->getSP(); - auto mapValueAttr = _m->getAttribute("map.value.value")->getSP(); - EXPECT_EQUAL(expValue, valueAttr->getInt(lid)); + auto valueAttr = _mgr->getAttribute("value")->getSP(); + auto mapKeyAttr = _mgr->getAttribute("map.key")->getSP(); + auto mapValueAttr = _mgr->getAttribute("map.value.value")->getSP(); + EXPECT_EQ(expValue, valueAttr->getInt(lid)); attribute::IntegerContent mapKeys; mapKeys.fill(*mapKeyAttr, lid); attribute::IntegerContent mapValues; mapValues.fill(*mapValueAttr, lid); - EXPECT_EQUAL(expMap.size(), mapValues.size()); - EXPECT_EQUAL(expMap.size(), mapKeys.size()); + EXPECT_EQ(expMap.size(), mapValues.size()); + EXPECT_EQ(expMap.size(), mapKeys.size()); size_t i = 0; for (const auto &expMapElem : expMap) { - EXPECT_EQUAL(expMapElem.first, mapKeys[i]); - EXPECT_EQUAL(expMapElem.second, mapValues[i]); + EXPECT_EQ(expMapElem.first, mapKeys[i]); + EXPECT_EQ(expMapElem.second, mapValues[i]); ++i; } } }; -TEST_F("require that update with doc argument updates struct field attributes (map)", StructMapFixture) +TEST_F(StructMapWriterTest, update_with_doc_argument_updates_struct_field_attributes) { - auto doc = f.makeDoc(10, {{1, 11}, {2, 12}}); - f.put(10, *doc, 1); - TEST_DO(f.checkAttrs(1, 10, {{1, 11}, {2, 12}})); - doc = f.makeDoc(20, {{42, 21}}); - f.update(11, *doc, 1, true); - TEST_DO(f.checkAttrs(1, 10, {{42, 21}})); + auto doc = makeDoc(10, {{1, 11}, {2, 12}}); + put(10, *doc, 1); + checkAttrs(1, 10, {{1, 11}, {2, 12}}); + doc = makeDoc(20, {{42, 21}}); + update(11, *doc, 1, true); + checkAttrs(1, 10, {{42, 21}}); } -TEST_MAIN() -{ - vespalib::rmdir(test_dir, true); - TEST_RUN_ALL(); -} +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/attribute/attribute_test.sh b/searchcore/src/tests/proton/attribute/attribute_test.sh deleted file mode 100755 index 26aa6d5f57a..00000000000 --- a/searchcore/src/tests/proton/attribute/attribute_test.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -set -e -rm -rf test_output -$VALGRIND ./searchcore_attribute_test_app diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index e1785e1e48d..b6d6d2437d8 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -662,10 +662,11 @@ void addField(Schema & s, const std::string &name, Schema::DataType dtype, - Schema::CollectionType ctype) + Schema::CollectionType ctype, + const std::string& tensor_spec = "") { - s.addSummaryField(Schema::SummaryField(name, dtype, ctype)); - s.addAttributeField(Schema::AttributeField(name, dtype, ctype)); + s.addSummaryField(Schema::SummaryField(name, dtype, ctype, tensor_spec)); + s.addAttributeField(Schema::AttributeField(name, dtype, ctype, tensor_spec)); } @@ -682,7 +683,7 @@ Test::requireThatAttributesAreUsed() addField(s, "bg", schema::DataType::INT32, CollectionType::WEIGHTEDSET); addField(s, "bh", schema::DataType::FLOAT, CollectionType::WEIGHTEDSET); addField(s, "bi", schema::DataType::STRING, CollectionType::WEIGHTEDSET); - addField(s, "bj", schema::DataType::TENSOR, CollectionType::SINGLE); + addField(s, "bj", schema::DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})"); BuildContext bc(s); DBContext dc(bc._repo, getDocTypeName()); diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 18235116d27..d1d08a332e3 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -285,8 +285,8 @@ SchemaContext::SchemaContext() : schema(new Schema()), builder() { - schema->addAttributeField(Schema::AttributeField("tensor", DataType::TENSOR, CollectionType::SINGLE)); - schema->addAttributeField(Schema::AttributeField("tensor2", DataType::TENSOR, CollectionType::SINGLE)); + schema->addAttributeField(Schema::AttributeField("tensor", DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})")); + schema->addAttributeField(Schema::AttributeField("tensor2", DataType::TENSOR, CollectionType::SINGLE, "tensor(x{},y{})")); addField("i1"); } diff --git a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp index 0f55a4c30de..5f93f97f165 100644 --- a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp +++ b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp @@ -3,6 +3,7 @@ #include <vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.h> #include <vespa/searchcore/proton/flushengine/flush_target_candidates.h> +#include <vespa/searchcore/proton/flushengine/flush_target_candidate.h> #include <vespa/searchcore/proton/flushengine/tls_stats_map.h> #include <vespa/searchcore/proton/test/dummy_flush_handler.h> #include <vespa/searchcore/proton/test/dummy_flush_target.h> @@ -21,20 +22,16 @@ struct SimpleFlushTarget : public test::DummyFlushTarget { SerialNum flushedSerial; uint64_t approxDiskBytes; - SimpleFlushTarget(const vespalib::string &name, - SerialNum flushedSerial_, - uint64_t approxDiskBytes_) - : test::DummyFlushTarget(name), - flushedSerial(flushedSerial_), - approxDiskBytes(approxDiskBytes_) - {} + double replay_operation_cost; SimpleFlushTarget(const vespalib::string &name, const Type &type, SerialNum flushedSerial_, - uint64_t approxDiskBytes_) + uint64_t approxDiskBytes_, + double replay_operation_cost_) : test::DummyFlushTarget(name, type, Component::OTHER), flushedSerial(flushedSerial_), - approxDiskBytes(approxDiskBytes_) + approxDiskBytes(approxDiskBytes_), + replay_operation_cost(replay_operation_cost_) {} virtual SerialNum getFlushedSerialNum() const override { return flushedSerial; @@ -42,6 +39,9 @@ struct SimpleFlushTarget : public test::DummyFlushTarget virtual uint64_t getApproxBytesToWriteToDisk() const override { return approxDiskBytes; } + double get_replay_operation_cost() const override { + return replay_operation_cost; + } }; class ContextsBuilder @@ -66,30 +66,35 @@ public: const vespalib::string &targetName, IFlushTarget::Type targetType, SerialNum flushedSerial, - uint64_t approxDiskBytes) { + uint64_t approxDiskBytes, + double replay_operation_cost) { IFlushHandler::SP handler = createAndGetHandler(handlerName); IFlushTarget::SP target = std::make_shared<SimpleFlushTarget>(targetName, targetType, flushedSerial, - approxDiskBytes); + approxDiskBytes, + replay_operation_cost); _result.push_back(std::make_shared<FlushContext>(handler, target, 0)); return *this; } ContextsBuilder &add(const vespalib::string &handlerName, const vespalib::string &targetName, SerialNum flushedSerial, - uint64_t approxDiskBytes) { - return add(handlerName, targetName, IFlushTarget::Type::FLUSH, flushedSerial, approxDiskBytes); + uint64_t approxDiskBytes, + double replay_operation_cost = 0.0) { + return add(handlerName, targetName, IFlushTarget::Type::FLUSH, flushedSerial, approxDiskBytes, replay_operation_cost); } ContextsBuilder &add(const vespalib::string &targetName, SerialNum flushedSerial, - uint64_t approxDiskBytes) { - return add("handler1", targetName, IFlushTarget::Type::FLUSH, flushedSerial, approxDiskBytes); + uint64_t approxDiskBytes, + double replay_operation_cost = 0.0) { + return add("handler1", targetName, IFlushTarget::Type::FLUSH, flushedSerial, approxDiskBytes, replay_operation_cost); } ContextsBuilder &addGC(const vespalib::string &targetName, SerialNum flushedSerial, - uint64_t approxDiskBytes) { - return add("handler1", targetName, IFlushTarget::Type::GC, flushedSerial, approxDiskBytes); + uint64_t approxDiskBytes, + double replay_operation_cost = 0.0) { + return add("handler1", targetName, IFlushTarget::Type::GC, flushedSerial, approxDiskBytes, replay_operation_cost); } FlushContext::List build() const { return _result; } }; @@ -99,6 +104,7 @@ class CandidatesBuilder private: const FlushContext::List *_sortedFlushContexts; size_t _numCandidates; + mutable std::vector<FlushTargetCandidate> _candidates; flushengine::TlsStats _tlsStats; Config _cfg; @@ -106,6 +112,7 @@ public: CandidatesBuilder(const FlushContext::List &sortedFlushContexts) : _sortedFlushContexts(&sortedFlushContexts), _numCandidates(sortedFlushContexts.size()), + _candidates(), _tlsStats(1000, 11, 110), _cfg(2.0, 3.0, 4.0) {} @@ -125,8 +132,16 @@ public: replayEndSerial); return *this; } + void setup_candidates() const { + _candidates.clear(); + _candidates.reserve(_sortedFlushContexts->size()); + for (const auto &flush_context : *_sortedFlushContexts) { + _candidates.emplace_back(flush_context, _tlsStats.getLastSerial(), _cfg); + } + } FlushTargetCandidates build() const { - return FlushTargetCandidates(*_sortedFlushContexts, + setup_candidates(); + return FlushTargetCandidates(_candidates, _numCandidates, _tlsStats, _cfg); @@ -196,9 +211,12 @@ struct FlushStrategyFixture { flushengine::TlsStatsMap _tlsStatsMap; PrepareRestartFlushStrategy strategy; - FlushStrategyFixture() + FlushStrategyFixture(const Config &config) : _tlsStatsMap(defaultTransactionLogStats()), - strategy(DEFAULT_CFG) + strategy(config) + {} + FlushStrategyFixture() + : FlushStrategyFixture(DEFAULT_CFG) {} FlushContext::List getFlushTargets(const FlushContext::List &targetList, const flushengine::TlsStatsMap &tlsStatsMap) const { @@ -297,6 +315,12 @@ TEST_F("require that flush targets for different flush handlers are treated inde TEST_DO(assertFlushContexts("[foo,baz,quz]", targets)); } +TEST_F("require that expensive to replay target is flushed", FlushStrategyFixture(Config(2.0, 1.0, 4.0))) +{ + FlushContext::List targets = f.getFlushTargets(ContextsBuilder(). + add("foo", 10, 249).add("bar", 60, 150).add("baz", 60, 150, 12.0).build(), f._tlsStatsMap); + TEST_DO(assertFlushContexts("[foo,baz]", targets)); +} TEST_MAIN() { diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 33b9d162163..8f19d5c203b 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -12,26 +12,48 @@ #include <vespa/searchcore/proton/common/attribute_updater.h> #include <vespa/searchlib/attribute/attributevector.hpp> #include <vespa/searchlib/attribute/imported_attribute_vector.h> +#include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/searchlib/common/idestructorcallback.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <future> #include <vespa/log/log.h> LOG_SETUP(".proton.attribute.attribute_writer"); using namespace document; using namespace search; + +using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; using search::attribute::ImportedAttributeVector; +using search::tensor::PrepareResult; using vespalib::ISequencedTaskExecutor; -using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; namespace proton { using LidVector = LidVectorContext::LidVector; +namespace { + +bool +use_two_phase_put_for_attribute(const AttributeVector& attr) +{ + const auto& cfg = attr.getConfig(); + if (cfg.basicType() == search::attribute::BasicType::Type::TENSOR && + cfg.hnsw_index_params().has_value() && + cfg.hnsw_index_params().value().allow_multi_threaded_indexing()) + { + return true; + } + return false; +} + +} + AttributeWriter::WriteField::WriteField(AttributeVector &attribute) : _fieldPath(), _attribute(attribute), - _structFieldAttribute(false) + _structFieldAttribute(false), + _use_two_phase_put(use_two_phase_put_for_attribute(attribute)) { const vespalib::string &name = attribute.getName(); _structFieldAttribute = attribute::isStructFieldAttribute(name); @@ -57,11 +79,11 @@ AttributeWriter::WriteField::buildFieldPath(const DocumentType &docType) AttributeWriter::WriteContext::WriteContext(ExecutorId executorId) : _executorId(executorId), _fields(), - _hasStructFieldAttribute(false) + _hasStructFieldAttribute(false), + _use_two_phase_put(false) { } - AttributeWriter::WriteContext::WriteContext(WriteContext &&rhs) noexcept = default; AttributeWriter::WriteContext::~WriteContext() = default; @@ -75,6 +97,13 @@ AttributeWriter::WriteContext::add(AttributeVector &attr) if (_fields.back().isStructFieldAttribute()) { _hasStructFieldAttribute = true; } + if (_fields.back().use_two_phase_put()) { + // Only support for one field per context when this is true. + assert(_fields.size() == 1); + _use_two_phase_put = true; + } else { + assert(!_use_two_phase_put); + } } void @@ -113,6 +142,27 @@ applyPutToAttribute(SerialNum serialNum, const FieldValue::UP &fieldValue, Docum } void +complete_put_to_attribute(SerialNum serial_num, + uint32_t docid, + AttributeVector& attr, + const FieldValue::SP& field_value, + std::future<std::unique_ptr<PrepareResult>>& result_future, + bool immediate_commit, + AttributeWriter::OnWriteDoneType) +{ + ensureLidSpace(serial_num, docid, attr); + if (field_value.get()) { + auto result = result_future.get(); + AttributeUpdater::complete_set_value(attr, docid, *field_value, std::move(result)); + } else { + attr.clearDoc(docid); + } + if (immediate_commit) { + attr.commit(serial_num, serial_num); + } +} + +void applyRemoveToAttribute(SerialNum serialNum, DocumentIdT lid, bool immediateCommit, AttributeVector &attr, AttributeWriter::OnWriteDoneType) { @@ -148,7 +198,6 @@ applyReplayDone(uint32_t docIdLimit, AttributeVector &attr) attr.shrinkLidSpace(); } - void applyHeartBeat(SerialNum serialNum, AttributeVector &attr) { @@ -166,7 +215,6 @@ applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVec } } - void applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, AttributeVector &attr) { @@ -208,7 +256,6 @@ struct BatchUpdateTask : public vespalib::Executor::Task { } } - SerialNum _serialNum; DocumentIdT _lid; bool _immediateCommit; @@ -221,6 +268,7 @@ class FieldContext vespalib::string _name; ExecutorId _executorId; AttributeVector *_attr; + bool _use_two_phase_put; public: FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr); @@ -228,13 +276,14 @@ public: bool operator<(const FieldContext &rhs) const; ExecutorId getExecutorId() const { return _executorId; } AttributeVector *getAttribute() const { return _attr; } + bool use_two_phase_put() const { return _use_two_phase_put; } }; - FieldContext::FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr) : _name(attr->getName()), _executorId(writer.getExecutorId(attr->getNamePrefix())), - _attr(attr) + _attr(attr), + _use_two_phase_put(use_two_phase_put_for_attribute(*attr)) { } @@ -303,6 +352,100 @@ PutTask::run() } } + +class PreparePutTask : public vespalib::Executor::Task { +private: + const SerialNum _serial_num; + const uint32_t _docid; + AttributeVector& _attr; + FieldValue::SP _field_value; + std::promise<std::unique_ptr<PrepareResult>> _result_promise; + +public: + PreparePutTask(SerialNum serial_num_in, + uint32_t docid_in, + const AttributeWriter::WriteField& field, + std::shared_ptr<DocumentFieldExtractor> field_extractor); + ~PreparePutTask() override; + void run() override; + SerialNum serial_num() const { return _serial_num; } + uint32_t docid() const { return _docid; } + AttributeVector& attr() { return _attr; } + FieldValue::SP field_value() { return _field_value; } + std::future<std::unique_ptr<PrepareResult>> result_future() { + return _result_promise.get_future(); + } +}; + +PreparePutTask::PreparePutTask(SerialNum serial_num_in, + uint32_t docid_in, + const AttributeWriter::WriteField& field, + std::shared_ptr<DocumentFieldExtractor> field_extractor) + : _serial_num(serial_num_in), + _docid(docid_in), + _attr(field.getAttribute()), + _field_value(), + _result_promise() +{ + // Note: No need to store the field extractor as we are not extracting struct fields. + auto value = field_extractor->getFieldValue(field.getFieldPath()); + _field_value.reset(value.release()); +} + +PreparePutTask::~PreparePutTask() = default; + +void +PreparePutTask::run() +{ + if (_attr.getStatus().getLastSyncToken() < _serial_num) { + if (_field_value.get()) { + _result_promise.set_value(AttributeUpdater::prepare_set_value(_attr, _docid, *_field_value)); + } + } +} + +class CompletePutTask : public vespalib::Executor::Task { +private: + const SerialNum _serial_num; + const uint32_t _docid; + AttributeVector& _attr; + FieldValue::SP _field_value; + std::future<std::unique_ptr<PrepareResult>> _result_future; + const bool _immediate_commit; + std::remove_reference_t<AttributeWriter::OnWriteDoneType> _on_write_done; + +public: + CompletePutTask(PreparePutTask& prepare_task, + bool immediate_commit, + AttributeWriter::OnWriteDoneType on_write_done); + ~CompletePutTask() override; + void run() override; +}; + +CompletePutTask::CompletePutTask(PreparePutTask& prepare_task, + bool immediate_commit, + AttributeWriter::OnWriteDoneType on_write_done) + : _serial_num(prepare_task.serial_num()), + _docid(prepare_task.docid()), + _attr(prepare_task.attr()), + _field_value(prepare_task.field_value()), + _result_future(prepare_task.result_future()), + _immediate_commit(immediate_commit), + _on_write_done(on_write_done) +{ +} + +CompletePutTask::~CompletePutTask() = default; + +void +CompletePutTask::run() +{ + if (_attr.getStatus().getLastSyncToken() < _serial_num) { + complete_put_to_attribute(_serial_num, _docid, _attr, _field_value, _result_future, + _immediate_commit, _on_write_done); + } +} + class RemoveTask : public vespalib::Executor::Task { const AttributeWriter::WriteContext &_wc; @@ -316,7 +459,6 @@ public: void run() override; }; - RemoveTask::RemoveTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone) : _wc(wc), _serialNum(serialNum), @@ -419,13 +561,22 @@ AttributeWriter::setupWriteContexts() fieldContexts.emplace_back(_attributeFieldWriter, attr); } std::sort(fieldContexts.begin(), fieldContexts.end()); - for (auto &fc : fieldContexts) { + for (const auto& fc : fieldContexts) { + if (fc.use_two_phase_put()) { + continue; + } if (_writeContexts.empty() || (_writeContexts.back().getExecutorId() != fc.getExecutorId())) { _writeContexts.emplace_back(fc.getExecutorId()); } _writeContexts.back().add(*fc.getAttribute()); } + for (const auto& fc : fieldContexts) { + if (fc.use_two_phase_put()) { + _writeContexts.emplace_back(fc.getExecutorId()); + _writeContexts.back().add(*fc.getAttribute()); + } + } for (const auto &wc : _writeContexts) { if (wc.hasStructFieldAttribute()) { _hasStructFieldAttribute = true; @@ -452,9 +603,19 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI } auto extractor = std::make_shared<DocumentFieldExtractor>(doc); for (const auto &wc : _writeContexts) { - if (allAttributes || wc.hasStructFieldAttribute()) { - auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, immediateCommit, allAttributes, onWriteDone); - _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); + if (wc.use_two_phase_put()) { + assert(wc.getFields().size() == 1); + auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor); + auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, immediateCommit, onWriteDone); + // We use the local docid to create an executor id to round-robin between the threads. + _attributeFieldWriter.executeTask(_attributeFieldWriter.getExecutorId(lid), std::move(prepare_task)); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task)); + } else { + if (allAttributes || wc.hasStructFieldAttribute()) { + auto putTask = std::make_unique<PutTask>(wc, serialNum, extractor, lid, immediateCommit, allAttributes, + onWriteDone); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); + } } } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index 4a9726dd113..726379220e3 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -19,20 +19,23 @@ namespace proton { class AttributeWriter : public IAttributeWriter { private: - typedef search::AttributeVector AttributeVector; - typedef document::FieldPath FieldPath; - typedef document::DataType DataType; - typedef document::DocumentType DocumentType; - typedef document::FieldValue FieldValue; + using AttributeVector = search::AttributeVector; + using FieldPath = document::FieldPath; + using DataType = document::DataType; + using DocumentType = document::DocumentType; + using FieldValue = document::FieldValue; const IAttributeManager::SP _mgr; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; public: - class WriteField - { + /** + * Represents an attribute vector for a field and details about how to write to it. + */ + class WriteField { FieldPath _fieldPath; AttributeVector &_attribute; bool _structFieldAttribute; // in array/map of struct + bool _use_two_phase_put; public: WriteField(AttributeVector &attribute); ~WriteField(); @@ -40,12 +43,18 @@ public: const FieldPath &getFieldPath() const { return _fieldPath; } void buildFieldPath(const DocumentType &docType); bool isStructFieldAttribute() const { return _structFieldAttribute; } + bool use_two_phase_put() const { return _use_two_phase_put; } }; - class WriteContext - { + + /** + * Represents a set of fields (as attributes) that are handled by the same write thread. + */ + class WriteContext { ExecutorId _executorId; std::vector<WriteField> _fields; bool _hasStructFieldAttribute; + // When this is true, the context only contains a single field. + bool _use_two_phase_put; public: WriteContext(ExecutorId executorId); WriteContext(WriteContext &&rhs) noexcept; @@ -56,6 +65,7 @@ public: ExecutorId getExecutorId() const { return _executorId; } const std::vector<WriteField> &getFields() const { return _fields; } bool hasStructFieldAttribute() const { return _hasStructFieldAttribute; } + bool use_two_phase_put() const { return _use_two_phase_put; } }; private: using AttrWithId = std::pair<search::AttributeVector *, ExecutorId>; @@ -103,6 +113,11 @@ public: void onReplayDone(uint32_t docIdLimit) override; bool hasStructFieldAttribute() const override; + + // Should only be used for unit testing. + const std::vector<WriteContext>& get_write_contexts() const { + return _writeContexts; + } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp index 0a61ec8d882..2b5f4b028dc 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp @@ -165,9 +165,15 @@ FlushableAttribute::FlushableAttribute(const AttributeVectorSP attr, _fileHeaderContext(fileHeaderContext), _attributeFieldWriter(attributeFieldWriter), _hwInfo(hwInfo), - _attrDir(attrDir) + _attrDir(attrDir), + _replay_operation_cost(0.0) { _lastStats.setPathElementsToLog(8); + auto &config = attr->getConfig(); + if (config.basicType() == search::attribute::BasicType::Type::TENSOR && + config.tensorType().is_tensor() && config.tensorType().is_dense() && config.hnsw_index_params().has_value()) { + _replay_operation_cost = 100.0; // replaying operations to hnsw index is 100 times more expensive than reading from tls + } } @@ -236,4 +242,10 @@ FlushableAttribute::getApproxBytesToWriteToDisk() const return _attr->getEstimatedSaveByteSize(); } +double +FlushableAttribute::get_replay_operation_cost() const +{ + return _replay_operation_cost; +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h index 8d807c153c0..a759bcce26e 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.h @@ -38,6 +38,7 @@ private: vespalib::ISequencedTaskExecutor &_attributeFieldWriter; HwInfo _hwInfo; std::shared_ptr<AttributeDirectory> _attrDir; + double _replay_operation_cost; Task::UP internalInitFlush(SerialNum currentSerial); @@ -71,6 +72,7 @@ public: virtual Task::UP initFlush(SerialNum currentSerial) override; virtual FlushStats getLastFlushStats() const override { return _lastStats; } virtual uint64_t getApproxBytesToWriteToDisk() const override; + virtual double get_replay_operation_cost() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index 7e1b1fb1e9a..4270006e301 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -27,6 +27,6 @@ vespa_add_library(searchcore_pcommon STATIC ${VESPA_STDCXX_FS_LIB} ) -if(VESPA_OS_DISTRO_COMBINED STREQUAL "rhel 8.2") +if(VESPA_OS_DISTRO_COMBINED STREQUAL "rhel 8.2" OR VESPA_OS_DISTRO_COMBINED STREQUAL "centos 8") set_source_files_properties(hw_info_sampler.cpp PROPERTIES COMPILE_FLAGS -DRHEL_8_2_KLUDGE) endif() diff --git a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp index 8fd47c17acb..d7cf6caff28 100644 --- a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.cpp @@ -31,6 +31,7 @@ LOG_SETUP(".proton.common.attribute_updater"); using namespace document; using vespalib::make_string; +using search::tensor::PrepareResult; using search::tensor::TensorAttribute; using search::attribute::ReferenceAttribute; @@ -471,27 +472,33 @@ AttributeUpdater::updateValue(StringAttribute & vec, uint32_t lid, const FieldVa } } +namespace { + +template <typename ExpFieldValueType> void -AttributeUpdater::updateValue(PredicateAttribute &vec, uint32_t lid, const FieldValue &val) +validate_field_value_type(const FieldValue& val, const vespalib::string& attr_type, const vespalib::string& value_type) { - if (!val.inherits(PredicateFieldValue::classId)) { + if (!val.inherits(ExpFieldValueType::classId)) { throw UpdateException( - make_string("PredicateAttribute must be updated with " - "PredicateFieldValues.")); + make_string("%s must be updated with %s, but was '%s'", + attr_type.c_str(), value_type.c_str(), val.toString(false).c_str())); } +} + +} + +void +AttributeUpdater::updateValue(PredicateAttribute &vec, uint32_t lid, const FieldValue &val) +{ + validate_field_value_type<PredicateFieldValue>(val, "PredicateAttribute", "PredicateFieldValue"); vec.updateValue(lid, static_cast<const PredicateFieldValue &>(val)); } void AttributeUpdater::updateValue(TensorAttribute &vec, uint32_t lid, const FieldValue &val) { - if (!val.inherits(TensorFieldValue::classId)) { - throw UpdateException( - make_string("TensorAttribute must be updated with " - "TensorFieldValues.")); - } - const auto &tensor = static_cast<const TensorFieldValue &>(val). - getAsTensorPtr(); + validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); + const auto &tensor = static_cast<const TensorFieldValue &>(val).getAsTensorPtr(); if (tensor) { vec.setTensor(lid, *tensor); } else { @@ -506,7 +513,7 @@ AttributeUpdater::updateValue(ReferenceAttribute &vec, uint32_t lid, const Field vec.clearDoc(lid); throw UpdateException( make_string("ReferenceAttribute must be updated with " - "ReferenceFieldValues.")); + "ReferenceFieldValue, but was '%s'", val.toString(false).c_str())); } const auto &reffv = static_cast<const ReferenceFieldValue &>(val); if (reffv.hasValidDocumentId()) { @@ -516,4 +523,57 @@ AttributeUpdater::updateValue(ReferenceAttribute &vec, uint32_t lid, const Field } } +namespace { + +void +validate_tensor_attribute_type(AttributeVector& attr) +{ + const auto& info = attr.getClass(); + if (!info.inherits(TensorAttribute::classId)) { + throw UpdateException( + make_string("Expected attribute vector '%s' to be a TensorAttribute, but was '%s'", + attr.getName().c_str(), info.name())); + } +} + +std::unique_ptr<PrepareResult> +prepare_set_tensor(TensorAttribute& attr, uint32_t docid, const FieldValue& val) +{ + validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); + const auto& tensor = static_cast<const TensorFieldValue&>(val).getAsTensorPtr(); + if (tensor) { + return attr.prepare_set_tensor(docid, *tensor); + } + return std::unique_ptr<PrepareResult>(); +} + +void +complete_set_tensor(TensorAttribute& attr, uint32_t docid, const FieldValue& val, std::unique_ptr<PrepareResult> prepare_result) +{ + validate_field_value_type<TensorFieldValue>(val, "TensorAttribute", "TensorFieldValue"); + const auto& tensor = static_cast<const TensorFieldValue&>(val).getAsTensorPtr(); + if (tensor) { + attr.complete_set_tensor(docid, *tensor, std::move(prepare_result)); + } else { + attr.clearDoc(docid); + } +} + +} + +std::unique_ptr<PrepareResult> +AttributeUpdater::prepare_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val) +{ + validate_tensor_attribute_type(attr); + return prepare_set_tensor(static_cast<TensorAttribute&>(attr), docid, val); +} + +void +AttributeUpdater::complete_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val, + std::unique_ptr<PrepareResult> prepare_result) +{ + validate_tensor_attribute_type(attr); + complete_set_tensor(static_cast<TensorAttribute&>(attr), docid, val, std::move(prepare_result)); +} + } // namespace search diff --git a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h index 01be6299692..32d14f6dd5a 100644 --- a/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h +++ b/searchcore/src/vespa/searchcore/proton/common/attribute_updater.h @@ -10,7 +10,10 @@ namespace search { class PredicateAttribute; -namespace tensor { class TensorAttribute; } +namespace tensor { + class PrepareResult; + class TensorAttribute; +} namespace attribute {class ReferenceAttribute; } VESPA_DEFINE_EXCEPTION(UpdateException, vespalib::Exception); @@ -20,14 +23,18 @@ VESPA_DEFINE_EXCEPTION(UpdateException, vespalib::Exception); */ class AttributeUpdater { using Field = document::Field; - using FieldValue = document::FieldValue; using FieldUpdate = document::FieldUpdate; + using FieldValue = document::FieldValue; using ValueUpdate = document::ValueUpdate; public: static void handleUpdate(AttributeVector & vec, uint32_t lid, const FieldUpdate & upd); static void handleValue(AttributeVector & vec, uint32_t lid, const FieldValue & val); + static std::unique_ptr<tensor::PrepareResult> prepare_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val); + static void complete_set_value(AttributeVector& attr, uint32_t docid, const FieldValue& val, + std::unique_ptr<tensor::PrepareResult> prepare_result); + private: template <typename V> static void handleUpdate(V & vec, uint32_t lid, const ValueUpdate & upd); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt index ecd90d8a992..340007f4513 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(searchcore_flushengine STATIC flushcontext.cpp flushengine.cpp flush_engine_explorer.cpp + flush_target_candidate.cpp flush_target_candidates.cpp flushtargetproxy.cpp flushtask.cpp diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidate.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidate.cpp new file mode 100644 index 00000000000..6be6d31372a --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidate.cpp @@ -0,0 +1,22 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "flush_target_candidate.h" +#include "flushcontext.h" + +namespace proton { + +FlushTargetCandidate::FlushTargetCandidate(std::shared_ptr<FlushContext> flush_context, search::SerialNum current_serial, const Config &cfg) + : _flush_context(std::move(flush_context)), + _replay_operation_cost(_flush_context->getTarget()->get_replay_operation_cost() * cfg.tlsReplayOperationCost), + _flushed_serial(_flush_context->getTarget()->getFlushedSerialNum()), + _current_serial(current_serial), + _replay_cost(_replay_operation_cost * (_current_serial - _flushed_serial)), + _approx_bytes_to_write_to_disk(_flush_context->getTarget()->getApproxBytesToWriteToDisk()), + _write_cost(_approx_bytes_to_write_to_disk * cfg.flushTargetWriteCost), + _always_flush(_replay_cost >= _write_cost) +{ +} + +FlushTargetCandidate::~FlushTargetCandidate() = default; + +} diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidate.h b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidate.h new file mode 100644 index 00000000000..5920fff6942 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidate.h @@ -0,0 +1,37 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "prepare_restart_flush_strategy.h" +#include <memory> +#include <vespa/searchlib/common/serialnum.h> + +namespace proton { + +class FlushContext; + +/** + * Class describing a flush target candidate for the prepare restart flush strategy. + */ +class FlushTargetCandidate +{ + std::shared_ptr<FlushContext> _flush_context; + double _replay_operation_cost; + search::SerialNum _flushed_serial; + search::SerialNum _current_serial; + double _replay_cost; + uint64_t _approx_bytes_to_write_to_disk; + double _write_cost; + bool _always_flush; + + using Config = PrepareRestartFlushStrategy::Config; +public: + FlushTargetCandidate(std::shared_ptr<FlushContext> flush_context, search::SerialNum current_serial, const Config &cfg); + ~FlushTargetCandidate(); + const std::shared_ptr<FlushContext> &get_flush_context() const { return _flush_context; } + search::SerialNum get_flushed_serial() const { return _flushed_serial; } + double get_write_cost() const { return _write_cost; } + bool get_always_flush() const { return _always_flush; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp index 0051c209ef9..f71da453559 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "flush_target_candidates.h" +#include "flush_target_candidate.h" #include "tls_stats.h" namespace proton { @@ -13,17 +14,17 @@ using TlsReplayCost = FlushTargetCandidates::TlsReplayCost; namespace { SerialNum -calculateReplayStartSerial(const FlushContext::List &sortedFlushContexts, - size_t numCandidates, +calculateReplayStartSerial(vespalib::ConstArrayRef<FlushTargetCandidate> candidates, + size_t num_candidates, const flushengine::TlsStats &tlsStats) { - if (numCandidates == 0) { + if (num_candidates == 0) { return tlsStats.getFirstSerial(); } - if (numCandidates == sortedFlushContexts.size()) { + if (num_candidates == candidates.size()) { return tlsStats.getLastSerial() + 1; } - return sortedFlushContexts[numCandidates]->getTarget()->getFlushedSerialNum() + 1; + return candidates[num_candidates].get_flushed_serial() + 1; } TlsReplayCost @@ -44,43 +45,44 @@ calculateTlsReplayCost(const flushengine::TlsStats &tlsStats, } double -calculateFlushTargetsWriteCost(const FlushContext::List &sortedFlushContexts, - size_t numCandidates, - const Config &cfg) +calculateFlushTargetsWriteCost(vespalib::ConstArrayRef<FlushTargetCandidate> candidates, + size_t num_candidates) { double result = 0; - for (size_t i = 0; i < numCandidates; ++i) { - const auto &flushContext = sortedFlushContexts[i]; - result += (flushContext->getTarget()->getApproxBytesToWriteToDisk() * - cfg.flushTargetWriteCost); + for (size_t i = 0; i < num_candidates; ++i) { + result += candidates[i].get_write_cost(); } return result; } } -FlushTargetCandidates::FlushTargetCandidates(const FlushContext::List &sortedFlushContexts, - size_t numCandidates, +FlushTargetCandidates::FlushTargetCandidates(vespalib::ConstArrayRef<FlushTargetCandidate> candidates, + size_t num_candidates, const flushengine::TlsStats &tlsStats, const Config &cfg) - : _sortedFlushContexts(&sortedFlushContexts), - _numCandidates(numCandidates), + : _candidates(candidates), + _num_candidates(std::min(num_candidates, _candidates.size())), _tlsReplayCost(calculateTlsReplayCost(tlsStats, cfg, - calculateReplayStartSerial(sortedFlushContexts, - numCandidates, + calculateReplayStartSerial(_candidates, + _num_candidates, tlsStats))), - _flushTargetsWriteCost(calculateFlushTargetsWriteCost(sortedFlushContexts, - numCandidates, - cfg)) + _flushTargetsWriteCost(calculateFlushTargetsWriteCost(_candidates, + _num_candidates)) { } FlushContext::List FlushTargetCandidates::getCandidates() const { - FlushContext::List result(_sortedFlushContexts->begin(), - _sortedFlushContexts->begin() + _numCandidates); + FlushContext::List result; + result.reserve(_num_candidates); + for (const auto &candidate : _candidates) { + if (result.size() < _num_candidates || candidate.get_always_flush()) { + result.emplace_back(candidate.get_flush_context()); + } + } return result; } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h index ea09989de31..2979173331c 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h @@ -2,11 +2,14 @@ #pragma once #include "prepare_restart_flush_strategy.h" +#include <vespa/vespalib/util/arrayref.h> namespace proton { namespace flushengine { class TlsStats; } +class FlushTargetCandidate; + /** * A set of flush targets that are candidates to be flushed. * @@ -27,8 +30,8 @@ public: double totalCost() const { return bytesCost + operationsCost; } }; private: - const FlushContext::List *_sortedFlushContexts; // NOTE: ownership is handled outside - size_t _numCandidates; + vespalib::ConstArrayRef<FlushTargetCandidate> _candidates; // NOTE: ownership is handled outside + size_t _num_candidates; TlsReplayCost _tlsReplayCost; double _flushTargetsWriteCost; @@ -37,8 +40,8 @@ private: public: using UP = std::unique_ptr<FlushTargetCandidates>; - FlushTargetCandidates(const FlushContext::List &sortedFlushContexts, - size_t numCandidates, + FlushTargetCandidates(vespalib::ConstArrayRef<FlushTargetCandidate> candidates, + size_t num_candidates, const flushengine::TlsStats &tlsStats, const Config &cfg); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp index 6cfb8cb6c3d..21f9c8465b0 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp @@ -2,6 +2,7 @@ #include "prepare_restart_flush_strategy.h" #include "flush_target_candidates.h" +#include "flush_target_candidate.h" #include "tls_stats_map.h" #include <sstream> #include <algorithm> @@ -70,17 +71,15 @@ flatten(const FlushContextsMap &flushContextsPerHandler) } void -sortByOldestFlushedSerialNumber(FlushContext::List &flushContexts) +sortByOldestFlushedSerialNumber(std::vector<FlushTargetCandidate>& candidates) { - std::sort(flushContexts.begin(), flushContexts.end(), - [](const auto &lhs, const auto &rhs) { - if (lhs->getTarget()->getFlushedSerialNum() == - rhs->getTarget()->getFlushedSerialNum()) { - return lhs->getName() < rhs->getName(); - } - return lhs->getTarget()->getFlushedSerialNum() < - rhs->getTarget()->getFlushedSerialNum(); - }); + std::sort(candidates.begin(), candidates.end(), + [](const auto &lhs, const auto &rhs) { + if (lhs.get_flushed_serial() == rhs.get_flushed_serial()) { + return lhs.get_flush_context()->getName() < rhs.get_flush_context()->getName(); + } + return lhs.get_flushed_serial() < rhs.get_flushed_serial(); + }); } vespalib::string @@ -103,12 +102,16 @@ findBestTargetsToFlush(const FlushContext::List &unsortedFlushContexts, const flushengine::TlsStats &tlsStats, const Config &cfg) { - FlushContext::List sortedFlushContexts = unsortedFlushContexts; - sortByOldestFlushedSerialNumber(sortedFlushContexts); + std::vector<FlushTargetCandidate> candidates; + candidates.reserve(unsortedFlushContexts.size()); + for (const auto &flush_context : unsortedFlushContexts) { + candidates.emplace_back(flush_context, tlsStats.getLastSerial(), cfg); + } + sortByOldestFlushedSerialNumber(candidates); - FlushTargetCandidates bestSet(sortedFlushContexts, 0, tlsStats, cfg); - for (size_t numCandidates = 1; numCandidates <= sortedFlushContexts.size(); ++numCandidates) { - FlushTargetCandidates nextSet(sortedFlushContexts, numCandidates, tlsStats, cfg); + FlushTargetCandidates bestSet(candidates, 0, tlsStats, cfg); + for (size_t numCandidates = 1; numCandidates <= candidates.size(); ++numCandidates) { + FlushTargetCandidates nextSet(candidates, numCandidates, tlsStats, cfg); LOG(debug, "findBestTargetsToFlush(): Created candidate set: " "flushTargets=[%s], tlsReplayBytesCost=%f, tlsReplayOperationsCost=%f, flushTargetsWriteCost=%f, totalCost=%f", toString(nextSet.getCandidates()).c_str(), diff --git a/searchcore/src/vespa/searchcore/proton/matching/query.cpp b/searchcore/src/vespa/searchcore/proton/matching/query.cpp index 8fd686e235d..5213a2b9230 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/query.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/query.cpp @@ -213,6 +213,8 @@ Query::handle_global_filters(uint32_t docid_limit) // optimized order may change after accounting for global filter: _blueprint = Blueprint::optimize(std::move(_blueprint)); LOG(debug, "blueprint after handle_global_filters:\n%s\n", _blueprint->asString().c_str()); + // strictness may change if optimized order changed: + fetchPostings(); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 719ba359ccf..962ee65c10d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -302,8 +302,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _warmupExecutor = std::make_unique<vespalib::ThreadStackExecutor>(4, 128*1024, index_warmup_executor); const size_t sharedThreads = deriveCompactionCompressionThreads(protonConfig, hwInfo.cpu()); - _sharedExecutor = std::make_unique<vespalib::BlockingThreadStackExecutor>(sharedThreads, 128*1024, sharedThreads*16, proton_shared_executor); - _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(*_sharedExecutor); + _sharedExecutor = std::make_shared<vespalib::BlockingThreadStackExecutor>(sharedThreads, 128*1024, sharedThreads*16, proton_shared_executor); + _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_sharedExecutor); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128 * 1024, initialize_executor); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index d0b76bd5804..d5c1a8b7b78 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -112,7 +112,7 @@ private: ProtonConfigurer _protonConfigurer; ProtonConfigFetcher _protonConfigFetcher; std::unique_ptr<vespalib::ThreadStackExecutorBase> _warmupExecutor; - std::unique_ptr<vespalib::ThreadStackExecutorBase> _sharedExecutor; + std::shared_ptr<vespalib::ThreadStackExecutorBase> _sharedExecutor; vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding; matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h index 3e49bb449ff..8e5d3018532 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h @@ -11,16 +11,26 @@ namespace proton::test { class MockAttributeManager : public IAttributeManager { private: search::attribute::test::MockAttributeManager _mock; + std::vector<search::AttributeVector*> _writables; std::unique_ptr<ImportedAttributesRepo> _importedAttributes; + vespalib::ISequencedTaskExecutor* _writer; public: MockAttributeManager() : _mock(), - _importedAttributes() + _writables(), + _importedAttributes(), + _writer() {} - void addAttribute(const vespalib::string &name, const search::AttributeVector::SP &attr) { + search::AttributeVector::SP addAttribute(const vespalib::string &name, const search::AttributeVector::SP &attr) { _mock.addAttribute(name, attr); + _writables.push_back(attr.get()); + return attr; + } + + void set_writer(vespalib::ISequencedTaskExecutor& writer) { + _writer = &writer; } search::AttributeGuard::UP getAttribute(const vespalib::string &name) const override { @@ -56,13 +66,18 @@ public: HDR_ABORT("should not be reached"); } vespalib::ISequencedTaskExecutor &getAttributeFieldWriter() const override { - HDR_ABORT("should not be reached"); - } - search::AttributeVector *getWritableAttribute(const vespalib::string &) const override { + assert(_writer != nullptr); + return *_writer; + } + search::AttributeVector *getWritableAttribute(const vespalib::string &name) const override { + auto attr = getAttribute(name); + if (attr) { + return attr->get(); + } return nullptr; } const std::vector<search::AttributeVector *> &getWritableAttributes() const override { - HDR_ABORT("should not be reached"); + return _writables; } void asyncForEachAttribute(std::shared_ptr<IConstAttributeFunctor>) const override { } |