summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/attribute/attribute_test.cpp36
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h1
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h24
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h31
5 files changed, 75 insertions, 24 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp
index 3c128e35d08..6a12d2daa20 100644
--- a/searchcore/src/tests/proton/attribute/attribute_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp
@@ -42,9 +42,9 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/test/insertion_operators.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/foreground_thread_executor.h>
#include <vespa/vespalib/util/foregroundtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutorobserver.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/log/log.h>
LOG_SETUP("attribute_test");
@@ -79,6 +79,7 @@ using search::tensor::TensorAttribute;
using search::test::DirectoryHandler;
using std::string;
using vespalib::ForegroundTaskExecutor;
+using vespalib::ForegroundThreadExecutor;
using vespalib::SequencedTaskExecutorObserver;
using vespalib::eval::TensorSpec;
using vespalib::eval::ValueType;
@@ -122,12 +123,12 @@ fillAttribute(const AttributeVector::SP &attr, uint32_t from, uint32_t to, int64
const std::shared_ptr<IDestructorCallback> emptyCallback;
-
class AttributeWriterTest : public ::testing::Test {
public:
DirectoryHandler _dirHandler;
std::unique_ptr<ForegroundTaskExecutor> _attributeFieldWriterReal;
std::unique_ptr<SequencedTaskExecutorObserver> _attributeFieldWriter;
+ ForegroundThreadExecutor _shared;
std::shared_ptr<MockAttributeManager> _mgr;
std::unique_ptr<AttributeWriter> _aw;
@@ -135,6 +136,7 @@ public:
: _dirHandler(test_dir),
_attributeFieldWriterReal(),
_attributeFieldWriter(),
+ _shared(),
_mgr(),
_aw()
{
@@ -147,6 +149,7 @@ public:
_attributeFieldWriter = std::make_unique<SequencedTaskExecutorObserver>(*_attributeFieldWriterReal);
_mgr = std::make_shared<MockAttributeManager>();
_mgr->set_writer(*_attributeFieldWriter);
+ _mgr->set_shared_executor(_shared);
allocAttributeWriter();
}
void allocAttributeWriter() {
@@ -574,7 +577,7 @@ public:
DirectoryHandler _dirHandler;
DummyFileHeaderContext _fileHeaderContext;
ForegroundTaskExecutor _attributeFieldWriter;
- vespalib::ThreadStackExecutor _shared;
+ ForegroundThreadExecutor _shared;
HwInfo _hwInfo;
proton::AttributeManager::SP _baseMgr;
FilterAttributeManager _filterMgr;
@@ -583,7 +586,7 @@ public:
: _dirHandler(test_dir),
_fileHeaderContext(),
_attributeFieldWriter(),
- _shared(1, 128 * 1024),
+ _shared(),
_hwInfo(),
_baseMgr(new proton::AttributeManager(test_dir, "test.subdb",
TuneFileAttributes(),
@@ -891,6 +894,11 @@ public:
startAttributeField("a1").
addTensor(std::unique_ptr<vespalib::tensor::Tensor>()).endField().endDocument();
}
+ void expect_shared_executor_tasks(size_t exp_accepted_tasks) {
+ auto stats = _shared.getStats();
+ EXPECT_EQ(exp_accepted_tasks, stats.acceptedTasks);
+ EXPECT_EQ(0, stats.rejectedTasks);
+ }
};
TEST_F(TwoPhasePutTest, handles_put_in_two_phases_when_specified_for_tensor_attribute)
@@ -899,16 +907,13 @@ TEST_F(TwoPhasePutTest, handles_put_in_two_phases_when_specified_for_tensor_attr
put(1, *doc, 1);
expect_tensor_attr_calls(1, 1);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
put(2, *doc, 2);
expect_tensor_attr_calls(2, 2);
- assertExecuteHistory({1, 0, 0, 0});
-
- put(3, *doc, 3);
- expect_tensor_attr_calls(3, 3);
- // Note that the prepare step is executed round-robin between the 2 threads.
- assertExecuteHistory({1, 0, 0, 0, 1, 0});
+ expect_shared_executor_tasks(2);
+ assertExecuteHistory({0, 0});
}
TEST_F(TwoPhasePutTest, put_is_ignored_when_serial_number_is_older_or_equal_to_attribute)
@@ -917,7 +922,8 @@ TEST_F(TwoPhasePutTest, put_is_ignored_when_serial_number_is_older_or_equal_to_a
attr->commit(7, 7);
put(7, *doc, 1);
expect_tensor_attr_calls(0, 0);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
}
TEST_F(TwoPhasePutTest, document_is_cleared_if_field_is_not_set)
@@ -925,7 +931,8 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_field_is_not_set)
auto doc = make_no_field_doc();
put(1, *doc, 1);
expect_tensor_attr_calls(0, 0, 1);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
}
TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set)
@@ -933,7 +940,8 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set)
auto doc = make_no_tensor_doc();
put(1, *doc, 1);
expect_tensor_attr_calls(0, 0, 1);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index a91b60d5166..a49b27caf36 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -12,9 +12,10 @@
#include <vespa/searchcore/proton/common/attribute_updater.h>
#include <vespa/searchlib/attribute/attributevector.hpp>
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
-#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
#include <vespa/log/log.h>
@@ -607,8 +608,7 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI
assert(wc.getFields().size() == 1);
auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor);
auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, immediateCommit, onWriteDone);
- // We use the local docid to create an executor id to round-robin between the threads.
- _attributeFieldWriter.executeTask(_attributeFieldWriter.getExecutorId(lid), std::move(prepare_task));
+ _shared_executor.execute(std::move(prepare_task));
_attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task));
} else {
if (allAttributes || wc.hasStructFieldAttribute()) {
@@ -633,6 +633,7 @@ AttributeWriter::internalRemove(SerialNum serialNum, DocumentIdT lid, bool immed
AttributeWriter::AttributeWriter(proton::IAttributeManager::SP mgr)
: _mgr(std::move(mgr)),
_attributeFieldWriter(_mgr->getAttributeFieldWriter()),
+ _shared_executor(_mgr->get_shared_executor()),
_writeContexts(),
_dataType(nullptr),
_hasStructFieldAttribute(false),
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index 726379220e3..eaf8abe4872 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -26,6 +26,7 @@ private:
using FieldValue = document::FieldValue;
const IAttributeManager::SP _mgr;
vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
+ vespalib::ThreadExecutor& _shared_executor;
using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
public:
/**
diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
index 2ace9a8ac6b..18c376a4c2b 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -8,26 +8,36 @@ namespace vespalib { class ISequencedTaskExecutor; }
namespace searchcorespi::index {
/**
- * Interface for the thread model used for write tasks.
+ * Interface for the thread model used for write tasks for a single document database.
*
* We have multiple write threads:
*
- * 1. The master write thread used for the majority of write tasks.
+ * 1. The "master" write thread used for the majority of write tasks.
*
- * 2. The index write thread used for doing changes to the memory
+ * 2. The "index" write thread used for doing changes to the memory
* index, either directly (for data not bound to a field) or via
* index field inverter executor or index field writer executor.
*
- * 3. The index field inverter executor is used to populate field
+ * 3. The "summary" thread is used for doing changes to the document store.
+ *
+ * 4. The "index field inverter" executor is used to populate field
* inverters with data from document fields. Scheduled tasks for
* the same field are executed in sequence.
*
- * 4. The index field writer executor is used to sort data in field
+ * 5. The "index field writer" executor is used to sort data in field
* inverters before pushing the data to the memory field indexes.
* Scheduled tasks for the same field are executed in sequence.
*
- * The master write thread is always the one giving tasks to the index
- * write thread.
+ * 6. The "attribute field writer" executor is used to write data to attribute vectors.
+ * Each attribute is always handled by the same thread,
+ * and scheduled tasks for the same attribute are executed in sequence.
+ *
+ * The master write thread is always the one giving tasks to the other write threads above.
+ *
+ * In addition this interface exposes the "shared" executor that is used by all document databases.
+ * This is among others used for compressing / de-compressing documents in the document store,
+ * merging files as part of disk index fusion, and running the prepare step when doing two-phase
+ * puts against a tensor attribute with a HNSW index.
*
* The index write thread extracts fields from documents and gives
* task to the index field inverter executor and the index field
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
new file mode 100644
index 00000000000..575552971fa
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -0,0 +1,31 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <atomic>
+
+namespace vespalib {
+
+/**
+ * Implementation of the ThreadExecutor interface that runs all tasks in the foreground by the calling thread.
+ */
+class ForegroundThreadExecutor : public vespalib::ThreadExecutor {
+private:
+ std::atomic<size_t> _accepted;
+
+public:
+ ForegroundThreadExecutor() : _accepted(0) { }
+ Task::UP execute(Task::UP task) override {
+ task->run();
+ ++_accepted;
+ return Task::UP();
+ }
+ size_t getNumThreads() const override { return 0; }
+ Stats getStats() override {
+ return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
+ }
+ virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
+};
+
+}