diff options
5 files changed, 75 insertions, 24 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp index 3c128e35d08..6a12d2daa20 100644 --- a/searchcore/src/tests/proton/attribute/attribute_test.cpp +++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp @@ -42,9 +42,9 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/test/insertion_operators.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/foreground_thread_executor.h> #include <vespa/vespalib/util/foregroundtaskexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutorobserver.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/log/log.h> LOG_SETUP("attribute_test"); @@ -79,6 +79,7 @@ using search::tensor::TensorAttribute; using search::test::DirectoryHandler; using std::string; using vespalib::ForegroundTaskExecutor; +using vespalib::ForegroundThreadExecutor; using vespalib::SequencedTaskExecutorObserver; using vespalib::eval::TensorSpec; using vespalib::eval::ValueType; @@ -122,12 +123,12 @@ fillAttribute(const AttributeVector::SP &attr, uint32_t from, uint32_t to, int64 const std::shared_ptr<IDestructorCallback> emptyCallback; - class AttributeWriterTest : public ::testing::Test { public: DirectoryHandler _dirHandler; std::unique_ptr<ForegroundTaskExecutor> _attributeFieldWriterReal; std::unique_ptr<SequencedTaskExecutorObserver> _attributeFieldWriter; + ForegroundThreadExecutor _shared; std::shared_ptr<MockAttributeManager> _mgr; std::unique_ptr<AttributeWriter> _aw; @@ -135,6 +136,7 @@ public: : _dirHandler(test_dir), _attributeFieldWriterReal(), _attributeFieldWriter(), + _shared(), _mgr(), _aw() { @@ -147,6 +149,7 @@ public: _attributeFieldWriter = std::make_unique<SequencedTaskExecutorObserver>(*_attributeFieldWriterReal); _mgr = std::make_shared<MockAttributeManager>(); _mgr->set_writer(*_attributeFieldWriter); + _mgr->set_shared_executor(_shared); allocAttributeWriter(); } void allocAttributeWriter() { @@ -574,7 +577,7 @@ public: DirectoryHandler _dirHandler; DummyFileHeaderContext _fileHeaderContext; ForegroundTaskExecutor _attributeFieldWriter; - vespalib::ThreadStackExecutor _shared; + ForegroundThreadExecutor _shared; HwInfo _hwInfo; proton::AttributeManager::SP _baseMgr; FilterAttributeManager _filterMgr; @@ -583,7 +586,7 @@ public: : _dirHandler(test_dir), _fileHeaderContext(), _attributeFieldWriter(), - _shared(1, 128 * 1024), + _shared(), _hwInfo(), _baseMgr(new proton::AttributeManager(test_dir, "test.subdb", TuneFileAttributes(), @@ -891,6 +894,11 @@ public: startAttributeField("a1"). addTensor(std::unique_ptr<vespalib::tensor::Tensor>()).endField().endDocument(); } + void expect_shared_executor_tasks(size_t exp_accepted_tasks) { + auto stats = _shared.getStats(); + EXPECT_EQ(exp_accepted_tasks, stats.acceptedTasks); + EXPECT_EQ(0, stats.rejectedTasks); + } }; TEST_F(TwoPhasePutTest, handles_put_in_two_phases_when_specified_for_tensor_attribute) @@ -899,16 +907,13 @@ TEST_F(TwoPhasePutTest, handles_put_in_two_phases_when_specified_for_tensor_attr put(1, *doc, 1); expect_tensor_attr_calls(1, 1); - assertExecuteHistory({1, 0}); + expect_shared_executor_tasks(1); + assertExecuteHistory({0}); put(2, *doc, 2); expect_tensor_attr_calls(2, 2); - assertExecuteHistory({1, 0, 0, 0}); - - put(3, *doc, 3); - expect_tensor_attr_calls(3, 3); - // Note that the prepare step is executed round-robin between the 2 threads. - assertExecuteHistory({1, 0, 0, 0, 1, 0}); + expect_shared_executor_tasks(2); + assertExecuteHistory({0, 0}); } TEST_F(TwoPhasePutTest, put_is_ignored_when_serial_number_is_older_or_equal_to_attribute) @@ -917,7 +922,8 @@ TEST_F(TwoPhasePutTest, put_is_ignored_when_serial_number_is_older_or_equal_to_a attr->commit(7, 7); put(7, *doc, 1); expect_tensor_attr_calls(0, 0); - assertExecuteHistory({1, 0}); + expect_shared_executor_tasks(1); + assertExecuteHistory({0}); } TEST_F(TwoPhasePutTest, document_is_cleared_if_field_is_not_set) @@ -925,7 +931,8 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_field_is_not_set) auto doc = make_no_field_doc(); put(1, *doc, 1); expect_tensor_attr_calls(0, 0, 1); - assertExecuteHistory({1, 0}); + expect_shared_executor_tasks(1); + assertExecuteHistory({0}); } TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set) @@ -933,7 +940,8 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set) auto doc = make_no_tensor_doc(); put(1, *doc, 1); expect_tensor_attr_calls(0, 0, 1); - assertExecuteHistory({1, 0}); + expect_shared_executor_tasks(1); + assertExecuteHistory({0}); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index a91b60d5166..a49b27caf36 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -12,9 +12,10 @@ #include <vespa/searchcore/proton/common/attribute_updater.h> #include <vespa/searchlib/attribute/attributevector.hpp> #include <vespa/searchlib/attribute/imported_attribute_vector.h> -#include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/threadexecutor.h> #include <future> #include <vespa/log/log.h> @@ -607,8 +608,7 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI assert(wc.getFields().size() == 1); auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor); auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, immediateCommit, onWriteDone); - // We use the local docid to create an executor id to round-robin between the threads. - _attributeFieldWriter.executeTask(_attributeFieldWriter.getExecutorId(lid), std::move(prepare_task)); + _shared_executor.execute(std::move(prepare_task)); _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task)); } else { if (allAttributes || wc.hasStructFieldAttribute()) { @@ -633,6 +633,7 @@ AttributeWriter::internalRemove(SerialNum serialNum, DocumentIdT lid, bool immed AttributeWriter::AttributeWriter(proton::IAttributeManager::SP mgr) : _mgr(std::move(mgr)), _attributeFieldWriter(_mgr->getAttributeFieldWriter()), + _shared_executor(_mgr->get_shared_executor()), _writeContexts(), _dataType(nullptr), _hasStructFieldAttribute(false), diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index 726379220e3..eaf8abe4872 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -26,6 +26,7 @@ private: using FieldValue = document::FieldValue; const IAttributeManager::SP _mgr; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; + vespalib::ThreadExecutor& _shared_executor; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; public: /** diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h index 2ace9a8ac6b..18c376a4c2b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -8,26 +8,36 @@ namespace vespalib { class ISequencedTaskExecutor; } namespace searchcorespi::index { /** - * Interface for the thread model used for write tasks. + * Interface for the thread model used for write tasks for a single document database. * * We have multiple write threads: * - * 1. The master write thread used for the majority of write tasks. + * 1. The "master" write thread used for the majority of write tasks. * - * 2. The index write thread used for doing changes to the memory + * 2. The "index" write thread used for doing changes to the memory * index, either directly (for data not bound to a field) or via * index field inverter executor or index field writer executor. * - * 3. The index field inverter executor is used to populate field + * 3. The "summary" thread is used for doing changes to the document store. + * + * 4. The "index field inverter" executor is used to populate field * inverters with data from document fields. Scheduled tasks for * the same field are executed in sequence. * - * 4. The index field writer executor is used to sort data in field + * 5. The "index field writer" executor is used to sort data in field * inverters before pushing the data to the memory field indexes. * Scheduled tasks for the same field are executed in sequence. * - * The master write thread is always the one giving tasks to the index - * write thread. + * 6. The "attribute field writer" executor is used to write data to attribute vectors. + * Each attribute is always handled by the same thread, + * and scheduled tasks for the same attribute are executed in sequence. + * + * The master write thread is always the one giving tasks to the other write threads above. + * + * In addition this interface exposes the "shared" executor that is used by all document databases. + * This is among others used for compressing / de-compressing documents in the document store, + * merging files as part of disk index fusion, and running the prepare step when doing two-phase + * puts against a tensor attribute with a HNSW index. * * The index write thread extracts fields from documents and gives * task to the index field inverter executor and the index field diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h new file mode 100644 index 00000000000..575552971fa --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -0,0 +1,31 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/threadexecutor.h> +#include <atomic> + +namespace vespalib { + +/** + * Implementation of the ThreadExecutor interface that runs all tasks in the foreground by the calling thread. + */ +class ForegroundThreadExecutor : public vespalib::ThreadExecutor { +private: + std::atomic<size_t> _accepted; + +public: + ForegroundThreadExecutor() : _accepted(0) { } + Task::UP execute(Task::UP task) override { + task->run(); + ++_accepted; + return Task::UP(); + } + size_t getNumThreads() const override { return 0; } + Stats getStats() override { + return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); + } + virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } +}; + +} |