diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-16 15:11:25 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-16 15:11:25 +0100 |
commit | 16324b48b8547313be4f94badd44401bd2023352 (patch) | |
tree | c8998b3032e924bd73fa6149f1cefd41c9705f00 | |
parent | 625434397bb349ba02469afaa6e9af017f6c2d00 (diff) | |
parent | 2a9d507d1205a36ba9d346084eef97cbd311a9da (diff) |
Merge pull request #20042 from vespa-engine/toregge/use-gate-callback-to-signal-completion-4
Use GateCallback to signal completion of scheduled tasks.
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 587ec8dda1f..f3d90d37e42 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -14,6 +14,8 @@ #include <vespa/searchlib/attribute/imported_attribute_vector.h> #include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/threadexecutor.h> #include <future> @@ -27,6 +29,7 @@ using namespace search; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; using search::attribute::ImportedAttributeVector; using search::tensor::PrepareResult; +using vespalib::GateCallback; using vespalib::ISequencedTaskExecutor; namespace proton { @@ -821,24 +824,38 @@ AttributeWriter::forceCommit(const CommitParam & param, OnWriteDoneType onWriteD void AttributeWriter::onReplayDone(uint32_t docIdLimit) { - for (auto entry : _attrMap) { - _attributeFieldWriter.execute(entry.second.executor_id, - [docIdLimit, attr = entry.second.attribute]() - { applyReplayDone(docIdLimit, *attr); }); + vespalib::Gate gate; + { + auto on_write_done = std::make_shared<GateCallback>(gate); + for (auto entry : _attrMap) { + _attributeFieldWriter.execute(entry.second.executor_id, + [docIdLimit, attr = entry.second.attribute, on_write_done]() + { + (void) on_write_done; + applyReplayDone(docIdLimit, *attr); + }); + } } - _attributeFieldWriter.sync_all(); + gate.await(); } void AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum) { - for (auto entry : _attrMap) { - _attributeFieldWriter.execute(entry.second.executor_id, - [wantedLidLimit, serialNum, attr=entry.second.attribute]() - { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); }); + vespalib::Gate gate; + { + auto on_write_done = std::make_shared<GateCallback>(gate); + for (auto entry : _attrMap) { + _attributeFieldWriter.execute(entry.second.executor_id, + [wantedLidLimit, serialNum, attr=entry.second.attribute, on_write_done]() + { + (void) on_write_done; + applyCompactLidSpace(wantedLidLimit, serialNum, *attr); + }); + } } - _attributeFieldWriter.sync_all(); + gate.await(); } bool |