summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-11-16 14:50:02 +0100
committerTor Egge <Tor.Egge@online.no>2021-11-16 14:50:02 +0100
commit2a9d507d1205a36ba9d346084eef97cbd311a9da (patch)
tree881fab3208e02fdec13ae01afb7b35b713b80160
parent4b155495f7d05176364ccfb86b12f2a7ac1f37cf (diff)
Use GateCallback to signal completion of scheduled tasks.
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp37
1 files changed, 27 insertions, 10 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 587ec8dda1f..f3d90d37e42 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -14,6 +14,8 @@
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
@@ -27,6 +29,7 @@ using namespace search;
using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
using search::attribute::ImportedAttributeVector;
using search::tensor::PrepareResult;
+using vespalib::GateCallback;
using vespalib::ISequencedTaskExecutor;
namespace proton {
@@ -821,24 +824,38 @@ AttributeWriter::forceCommit(const CommitParam & param, OnWriteDoneType onWriteD
void
AttributeWriter::onReplayDone(uint32_t docIdLimit)
{
- for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.executor_id,
- [docIdLimit, attr = entry.second.attribute]()
- { applyReplayDone(docIdLimit, *attr); });
+ vespalib::Gate gate;
+ {
+ auto on_write_done = std::make_shared<GateCallback>(gate);
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [docIdLimit, attr = entry.second.attribute, on_write_done]()
+ {
+ (void) on_write_done;
+ applyReplayDone(docIdLimit, *attr);
+ });
+ }
}
- _attributeFieldWriter.sync_all();
+ gate.await();
}
void
AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum)
{
- for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.executor_id,
- [wantedLidLimit, serialNum, attr=entry.second.attribute]()
- { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); });
+ vespalib::Gate gate;
+ {
+ auto on_write_done = std::make_shared<GateCallback>(gate);
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [wantedLidLimit, serialNum, attr=entry.second.attribute, on_write_done]()
+ {
+ (void) on_write_done;
+ applyCompactLidSpace(wantedLidLimit, serialNum, *attr);
+ });
+ }
}
- _attributeFieldWriter.sync_all();
+ gate.await();
}
bool