summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-16 15:11:25 +0100
committerGitHub <noreply@github.com>2021-11-16 15:11:25 +0100
commit16324b48b8547313be4f94badd44401bd2023352 (patch)
treec8998b3032e924bd73fa6149f1cefd41c9705f00
parent625434397bb349ba02469afaa6e9af017f6c2d00 (diff)
parent2a9d507d1205a36ba9d346084eef97cbd311a9da (diff)
Merge pull request #20042 from vespa-engine/toregge/use-gate-callback-to-signal-completion-4
Use GateCallback to signal completion of scheduled tasks.
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp37
1 files changed, 27 insertions, 10 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 587ec8dda1f..f3d90d37e42 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -14,6 +14,8 @@
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
@@ -27,6 +29,7 @@ using namespace search;
using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
using search::attribute::ImportedAttributeVector;
using search::tensor::PrepareResult;
+using vespalib::GateCallback;
using vespalib::ISequencedTaskExecutor;
namespace proton {
@@ -821,24 +824,38 @@ AttributeWriter::forceCommit(const CommitParam & param, OnWriteDoneType onWriteD
void
AttributeWriter::onReplayDone(uint32_t docIdLimit)
{
- for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.executor_id,
- [docIdLimit, attr = entry.second.attribute]()
- { applyReplayDone(docIdLimit, *attr); });
+ vespalib::Gate gate;
+ {
+ auto on_write_done = std::make_shared<GateCallback>(gate);
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [docIdLimit, attr = entry.second.attribute, on_write_done]()
+ {
+ (void) on_write_done;
+ applyReplayDone(docIdLimit, *attr);
+ });
+ }
}
- _attributeFieldWriter.sync_all();
+ gate.await();
}
void
AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum)
{
- for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.executor_id,
- [wantedLidLimit, serialNum, attr=entry.second.attribute]()
- { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); });
+ vespalib::Gate gate;
+ {
+ auto on_write_done = std::make_shared<GateCallback>(gate);
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [wantedLidLimit, serialNum, attr=entry.second.attribute, on_write_done]()
+ {
+ (void) on_write_done;
+ applyCompactLidSpace(wantedLidLimit, serialNum, *attr);
+ });
+ }
}
- _attributeFieldWriter.sync_all();
+ gate.await();
}
bool