summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-06-25 15:31:41 +0000
committerGeir Storli <geirst@verizonmedia.com>2020-06-25 15:53:38 +0000
commit4b5340c5fa6d78af845cc19d1667818fb7d6bf70 (patch)
tree6ae44eb8818fa4be4819faf1e44186ded77e5c6f
parent4350799ad50b3377d07857838fab9aac3926b8e4 (diff)
Run prepare step of two-phase puts in the shared executor instead of the attribute field executor.
Benchmarking has shown that we get an uneven balance between the underlying threads when using the attribute field executor. This is a sequenced task executor, where each underlying thread only is allowed to handle a subset of the prepare tasks (based on the local document id). Using the shared executor should give an even balance.
-rw-r--r--searchcore/src/tests/proton/attribute/attribute_test.cpp36
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h1
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h24
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h31
5 files changed, 75 insertions, 24 deletions
diff --git a/searchcore/src/tests/proton/attribute/attribute_test.cpp b/searchcore/src/tests/proton/attribute/attribute_test.cpp
index 3c128e35d08..6a12d2daa20 100644
--- a/searchcore/src/tests/proton/attribute/attribute_test.cpp
+++ b/searchcore/src/tests/proton/attribute/attribute_test.cpp
@@ -42,9 +42,9 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/test/insertion_operators.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/foreground_thread_executor.h>
#include <vespa/vespalib/util/foregroundtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutorobserver.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/log/log.h>
LOG_SETUP("attribute_test");
@@ -79,6 +79,7 @@ using search::tensor::TensorAttribute;
using search::test::DirectoryHandler;
using std::string;
using vespalib::ForegroundTaskExecutor;
+using vespalib::ForegroundThreadExecutor;
using vespalib::SequencedTaskExecutorObserver;
using vespalib::eval::TensorSpec;
using vespalib::eval::ValueType;
@@ -122,12 +123,12 @@ fillAttribute(const AttributeVector::SP &attr, uint32_t from, uint32_t to, int64
const std::shared_ptr<IDestructorCallback> emptyCallback;
-
class AttributeWriterTest : public ::testing::Test {
public:
DirectoryHandler _dirHandler;
std::unique_ptr<ForegroundTaskExecutor> _attributeFieldWriterReal;
std::unique_ptr<SequencedTaskExecutorObserver> _attributeFieldWriter;
+ ForegroundThreadExecutor _shared;
std::shared_ptr<MockAttributeManager> _mgr;
std::unique_ptr<AttributeWriter> _aw;
@@ -135,6 +136,7 @@ public:
: _dirHandler(test_dir),
_attributeFieldWriterReal(),
_attributeFieldWriter(),
+ _shared(),
_mgr(),
_aw()
{
@@ -147,6 +149,7 @@ public:
_attributeFieldWriter = std::make_unique<SequencedTaskExecutorObserver>(*_attributeFieldWriterReal);
_mgr = std::make_shared<MockAttributeManager>();
_mgr->set_writer(*_attributeFieldWriter);
+ _mgr->set_shared_executor(_shared);
allocAttributeWriter();
}
void allocAttributeWriter() {
@@ -574,7 +577,7 @@ public:
DirectoryHandler _dirHandler;
DummyFileHeaderContext _fileHeaderContext;
ForegroundTaskExecutor _attributeFieldWriter;
- vespalib::ThreadStackExecutor _shared;
+ ForegroundThreadExecutor _shared;
HwInfo _hwInfo;
proton::AttributeManager::SP _baseMgr;
FilterAttributeManager _filterMgr;
@@ -583,7 +586,7 @@ public:
: _dirHandler(test_dir),
_fileHeaderContext(),
_attributeFieldWriter(),
- _shared(1, 128 * 1024),
+ _shared(),
_hwInfo(),
_baseMgr(new proton::AttributeManager(test_dir, "test.subdb",
TuneFileAttributes(),
@@ -891,6 +894,11 @@ public:
startAttributeField("a1").
addTensor(std::unique_ptr<vespalib::tensor::Tensor>()).endField().endDocument();
}
+ void expect_shared_executor_tasks(size_t exp_accepted_tasks) {
+ auto stats = _shared.getStats();
+ EXPECT_EQ(exp_accepted_tasks, stats.acceptedTasks);
+ EXPECT_EQ(0, stats.rejectedTasks);
+ }
};
TEST_F(TwoPhasePutTest, handles_put_in_two_phases_when_specified_for_tensor_attribute)
@@ -899,16 +907,13 @@ TEST_F(TwoPhasePutTest, handles_put_in_two_phases_when_specified_for_tensor_attr
put(1, *doc, 1);
expect_tensor_attr_calls(1, 1);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
put(2, *doc, 2);
expect_tensor_attr_calls(2, 2);
- assertExecuteHistory({1, 0, 0, 0});
-
- put(3, *doc, 3);
- expect_tensor_attr_calls(3, 3);
- // Note that the prepare step is executed round-robin between the 2 threads.
- assertExecuteHistory({1, 0, 0, 0, 1, 0});
+ expect_shared_executor_tasks(2);
+ assertExecuteHistory({0, 0});
}
TEST_F(TwoPhasePutTest, put_is_ignored_when_serial_number_is_older_or_equal_to_attribute)
@@ -917,7 +922,8 @@ TEST_F(TwoPhasePutTest, put_is_ignored_when_serial_number_is_older_or_equal_to_a
attr->commit(7, 7);
put(7, *doc, 1);
expect_tensor_attr_calls(0, 0);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
}
TEST_F(TwoPhasePutTest, document_is_cleared_if_field_is_not_set)
@@ -925,7 +931,8 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_field_is_not_set)
auto doc = make_no_field_doc();
put(1, *doc, 1);
expect_tensor_attr_calls(0, 0, 1);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
}
TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set)
@@ -933,7 +940,8 @@ TEST_F(TwoPhasePutTest, document_is_cleared_if_tensor_in_field_is_not_set)
auto doc = make_no_tensor_doc();
put(1, *doc, 1);
expect_tensor_attr_calls(0, 0, 1);
- assertExecuteHistory({1, 0});
+ expect_shared_executor_tasks(1);
+ assertExecuteHistory({0});
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index a91b60d5166..a49b27caf36 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -12,9 +12,10 @@
#include <vespa/searchcore/proton/common/attribute_updater.h>
#include <vespa/searchlib/attribute/attributevector.hpp>
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
-#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
#include <vespa/log/log.h>
@@ -607,8 +608,7 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI
assert(wc.getFields().size() == 1);
auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc.getFields()[0], extractor);
auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, immediateCommit, onWriteDone);
- // We use the local docid to create an executor id to round-robin between the threads.
- _attributeFieldWriter.executeTask(_attributeFieldWriter.getExecutorId(lid), std::move(prepare_task));
+ _shared_executor.execute(std::move(prepare_task));
_attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task));
} else {
if (allAttributes || wc.hasStructFieldAttribute()) {
@@ -633,6 +633,7 @@ AttributeWriter::internalRemove(SerialNum serialNum, DocumentIdT lid, bool immed
AttributeWriter::AttributeWriter(proton::IAttributeManager::SP mgr)
: _mgr(std::move(mgr)),
_attributeFieldWriter(_mgr->getAttributeFieldWriter()),
+ _shared_executor(_mgr->get_shared_executor()),
_writeContexts(),
_dataType(nullptr),
_hasStructFieldAttribute(false),
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index 726379220e3..eaf8abe4872 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -26,6 +26,7 @@ private:
using FieldValue = document::FieldValue;
const IAttributeManager::SP _mgr;
vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
+ vespalib::ThreadExecutor& _shared_executor;
using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
public:
/**
diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
index 2ace9a8ac6b..18c376a4c2b 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -8,26 +8,36 @@ namespace vespalib { class ISequencedTaskExecutor; }
namespace searchcorespi::index {
/**
- * Interface for the thread model used for write tasks.
+ * Interface for the thread model used for write tasks for a single document database.
*
* We have multiple write threads:
*
- * 1. The master write thread used for the majority of write tasks.
+ * 1. The "master" write thread used for the majority of write tasks.
*
- * 2. The index write thread used for doing changes to the memory
+ * 2. The "index" write thread used for doing changes to the memory
* index, either directly (for data not bound to a field) or via
* index field inverter executor or index field writer executor.
*
- * 3. The index field inverter executor is used to populate field
+ * 3. The "summary" thread is used for doing changes to the document store.
+ *
+ * 4. The "index field inverter" executor is used to populate field
* inverters with data from document fields. Scheduled tasks for
* the same field are executed in sequence.
*
- * 4. The index field writer executor is used to sort data in field
+ * 5. The "index field writer" executor is used to sort data in field
* inverters before pushing the data to the memory field indexes.
* Scheduled tasks for the same field are executed in sequence.
*
- * The master write thread is always the one giving tasks to the index
- * write thread.
+ * 6. The "attribute field writer" executor is used to write data to attribute vectors.
+ * Each attribute is always handled by the same thread,
+ * and scheduled tasks for the same attribute are executed in sequence.
+ *
+ * The master write thread is always the one giving tasks to the other write threads above.
+ *
+ * In addition this interface exposes the "shared" executor that is used by all document databases.
+ * This is among others used for compressing / de-compressing documents in the document store,
+ * merging files as part of disk index fusion, and running the prepare step when doing two-phase
+ * puts against a tensor attribute with a HNSW index.
*
* The index write thread extracts fields from documents and gives
* task to the index field inverter executor and the index field
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
new file mode 100644
index 00000000000..575552971fa
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -0,0 +1,31 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <atomic>
+
+namespace vespalib {
+
+/**
+ * Implementation of the ThreadExecutor interface that runs all tasks in the foreground by the calling thread.
+ */
+class ForegroundThreadExecutor : public vespalib::ThreadExecutor {
+private:
+ std::atomic<size_t> _accepted;
+
+public:
+ ForegroundThreadExecutor() : _accepted(0) { }
+ Task::UP execute(Task::UP task) override {
+ task->run();
+ ++_accepted;
+ return Task::UP();
+ }
+ size_t getNumThreads() const override { return 0; }
+ Stats getStats() override {
+ return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
+ }
+ virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
+};
+
+}