aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-10-20 12:41:11 +0200
committerTor Egge <Tor.Egge@online.no>2021-10-20 12:41:11 +0200
commitf19fc4ebc9010db3bd35f53923866c62e2faefa9 (patch)
tree37566230c57b279de5fa520c50f81c1f0c1a5eb6
parent25c60f5b960db2a34e05e8466bb0ff62c98a236a (diff)
Eliminate ApplyBucketDiffEntryResult.
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp60
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h16
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp44
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h32
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp69
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h14
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp45
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h11
10 files changed, 128 insertions, 173 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index 6e1e62d2080..fe77477fa77 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -1,6 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h>
#include <vespa/storage/persistence/apply_bucket_diff_state.h>
#include <vespa/storage/persistence/merge_bucket_info_syncer.h>
#include <vespa/document/base/documentid.h>
@@ -26,38 +25,44 @@ document::Bucket dummy_document_bucket(makeDocumentBucket(document::BucketId(0,
class DummyMergeBucketInfoSyncer : public MergeBucketInfoSyncer
{
uint32_t& _sync_count;
+ vespalib::string _fail;
public:
DummyMergeBucketInfoSyncer(uint32_t& sync_count)
: MergeBucketInfoSyncer(),
- _sync_count(sync_count)
+ _sync_count(sync_count),
+ _fail()
{
}
+ ~DummyMergeBucketInfoSyncer();
void sync_bucket_info(const spi::Bucket& bucket) const override {
EXPECT_EQ(bucket, spi::Bucket(dummy_document_bucket));
++_sync_count;
+ if (!_fail.empty()) {
+ throw std::runtime_error(_fail);
+ }
}
+ void set_fail(vespalib::string fail) { _fail = std::move(fail); }
};
-ApplyBucketDiffEntryResult
-make_result(spi::Result &spi_result, const DocumentId &doc_id)
+DummyMergeBucketInfoSyncer::~DummyMergeBucketInfoSyncer() = default;
+
+void
+make_result(ApplyBucketDiffState& state, spi::Result &spi_result, const DocumentId &doc_id)
{
- std::promise<std::unique_ptr<spi::Result>> result_promise;
- result_promise.set_value(std::make_unique<spi::Result>(spi_result));
- spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id)));
- return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op);
+ state.on_entry_complete(std::make_unique<spi::Result>(spi_result), doc_id, test_op);
}
void push_ok(ApplyBucketDiffState &state)
{
- state.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
- state.push_back(make_result(spi_result_ok, DocumentId("id::test::1")));
+ make_result(state, spi_result_ok, DocumentId("id::test::0"));
+ make_result(state, spi_result_ok, DocumentId("id::test::1"));
}
void push_bad(ApplyBucketDiffState &state)
{
- state.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
- state.push_back(make_result(spi_result_fail, DocumentId("id::test::1")));
- state.push_back(make_result(spi_result_fail, DocumentId("id::test::2")));
+ make_result(state, spi_result_ok, DocumentId("id::test::0"));
+ make_result(state, spi_result_fail, DocumentId("id::test::1"));
+ make_result(state, spi_result_fail, DocumentId("id::test::2"));
}
}
@@ -75,15 +80,19 @@ public:
{
}
+ ~ApplyBucketDiffStateTestBase();
+
std::unique_ptr<ApplyBucketDiffState> make_state() {
return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket));
}
};
+ApplyBucketDiffStateTestBase::~ApplyBucketDiffStateTestBase() = default;
+
class ApplyBucketDiffStateTest : public ApplyBucketDiffStateTestBase
{
public:
- std::unique_ptr<ApplyBucketDiffState> state;
+ std::shared_ptr<ApplyBucketDiffState> state;
ApplyBucketDiffStateTest()
: ApplyBucketDiffStateTestBase(),
@@ -95,13 +104,14 @@ public:
state = make_state();
}
+ void check_failure(std::string expected) {
+ auto future = state->get_future();
+ state.reset();
+ std::string fail_message = future.get();
+ EXPECT_EQ(expected, fail_message);
+ }
void check_failure() {
- try {
- state->check();
- FAIL() << "Failed to throw exception for failed result";
- } catch (std::exception &e) {
- EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what()));
- }
+ check_failure("Failed put for id::test::1 in Bucket(0x0000000000000010): Result(5, write blocked)");
}
};
@@ -109,7 +119,7 @@ public:
TEST_F(ApplyBucketDiffStateTest, ok_results_can_be_checked)
{
push_ok(*state);
- state->check();
+ check_failure("");
}
TEST_F(ApplyBucketDiffStateTest, failed_result_errors_ignored)
@@ -146,4 +156,12 @@ TEST_F(ApplyBucketDiffStateTest, explicit_sync_bucket_info_works)
EXPECT_EQ(1, sync_count);
}
+TEST_F(ApplyBucketDiffStateTest, failed_sync_bucket_info_is_detected)
+{
+ vespalib::string fail("sync bucket failed");
+ syncer.set_fail(fail);
+ state->mark_stale_bucket_info();
+ check_failure(fail);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 4b41d3fa778..c737d2bed28 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -2,7 +2,6 @@
vespa_add_library(storage_spersistence OBJECT
SOURCES
apply_bucket_diff_entry_complete.cpp
- apply_bucket_diff_entry_result.cpp
apply_bucket_diff_state.cpp
asynchandler.cpp
bucketownershipnotifier.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
index 034089d4eca..1fbe155b16d 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -1,14 +1,17 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "apply_bucket_diff_entry_complete.h"
+#include "apply_bucket_diff_state.h"
#include <vespa/persistence/spi/result.h>
#include <cassert>
namespace storage {
-ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric)
+ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric)
: _result_handler(nullptr),
- _result_promise(std::move(result_promise)),
+ _state(std::move(state)),
+ _doc_id(std::move(doc_id)),
+ _op(op),
_start_time(clock),
_latency_metric(latency_metric)
{
@@ -24,7 +27,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result)
}
double elapsed = _start_time.getElapsedTimeAsDouble();
_latency_metric.addValue(elapsed);
- _result_promise.set_value(std::move(result));
+ _state->on_entry_complete(std::move(result), _doc_id, _op);
}
void
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
index 6f05ed82fa0..dd2346d9dee 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -2,6 +2,7 @@
#pragma once
+#include <vespa/document/base/documentid.h>
#include <vespa/metrics/valuemetric.h>
#include <vespa/persistence/spi/operationcomplete.h>
#include <vespa/storageframework/generic/clock/timer.h>
@@ -9,19 +10,22 @@
namespace storage {
+class ApplyBucketDiffState;
+
/*
* Complete handler for a bucket diff entry spi operation (putAsync
* or removeAsync)
*/
class ApplyBucketDiffEntryComplete : public spi::OperationComplete
{
- using ResultPromise = std::promise<std::unique_ptr<spi::Result>>;
- const spi::ResultHandler* _result_handler;
- ResultPromise _result_promise;
- framework::MilliSecTimer _start_time;
- metrics::DoubleAverageMetric& _latency_metric;
+ const spi::ResultHandler* _result_handler;
+ std::shared_ptr<ApplyBucketDiffState> _state;
+ document::DocumentId _doc_id;
+ const char* _op;
+ framework::MilliSecTimer _start_time;
+ metrics::DoubleAverageMetric& _latency_metric;
public:
- ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric);
+ ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric);
~ApplyBucketDiffEntryComplete();
void onComplete(std::unique_ptr<spi::Result> result) override;
void addResultHandler(const spi::ResultHandler* resultHandler) override;
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp
deleted file mode 100644
index 9cb5bb1bfd0..00000000000
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "apply_bucket_diff_entry_result.h"
-#include <vespa/persistence/spi/result.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <cassert>
-
-namespace storage {
-
-ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op)
- : _future_result(std::move(future_result)),
- _bucket(bucket),
- _doc_id(std::move(doc_id)),
- _op(op)
-{
-}
-
-ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs) = default;
-
-ApplyBucketDiffEntryResult::~ApplyBucketDiffEntryResult() = default;
-
-void
-ApplyBucketDiffEntryResult::wait()
-{
- assert(_future_result.valid());
- _future_result.wait();
-}
-
-void
-ApplyBucketDiffEntryResult::check_result()
-{
- assert(_future_result.valid());
- auto result = _future_result.get();
- if (result->hasError()) {
- vespalib::asciistream ss;
- ss << "Failed " << _op
- << " for " << _doc_id.toString()
- << " in " << _bucket
- << ": " << result->toString();
- throw std::runtime_error(ss.str());
- }
-}
-
-}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h
deleted file mode 100644
index 662656f2dbc..00000000000
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/document/base/documentid.h>
-#include <vespa/persistence/spi/bucket.h>
-#include <vespa/metrics/valuemetric.h>
-#include <future>
-
-namespace storage::spi { class Result; }
-
-namespace storage {
-
-/*
- * Result of a bucket diff entry spi operation (putAsync or removeAsync)
- */
-class ApplyBucketDiffEntryResult {
- using FutureResult = std::future<std::unique_ptr<spi::Result>>;
- FutureResult _future_result;
- spi::Bucket _bucket;
- document::DocumentId _doc_id;
- const char* _op;
-
-public:
- ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op);
- ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs);
- ~ApplyBucketDiffEntryResult();
- void wait();
- void check_result();
-};
-
-}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index c754796ba51..eb7a5ef5bc6 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -1,64 +1,50 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "apply_bucket_diff_state.h"
-#include "apply_bucket_diff_entry_result.h"
#include "mergehandler.h"
+#include <vespa/document/base/documentid.h>
+#include <vespa/persistence/spi/result.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+
+using storage::spi::Result;
namespace storage {
ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket)
- : _async_results(),
- _merge_bucket_info_syncer(merge_bucket_info_syncer),
+ : _merge_bucket_info_syncer(merge_bucket_info_syncer),
_bucket(bucket),
- _stale_bucket_info(false)
+ _fail_message(),
+ _failed_flag(),
+ _stale_bucket_info(false),
+ _promise()
{
}
ApplyBucketDiffState::~ApplyBucketDiffState()
{
- wait();
try {
sync_bucket_info();
- } catch (std::exception&) {
- }
-}
-
-bool
-ApplyBucketDiffState::empty() const
-{
- return _async_results.empty();
-}
-
-void
-ApplyBucketDiffState::wait()
-{
- if (!_async_results.empty()) {
- _async_results.back().wait();
+ } catch (std::exception& e) {
+ if (_fail_message.empty()) {
+ _fail_message = e.what();
+ }
}
- for (auto &result_to_check : _async_results) {
- result_to_check.wait();
+ if (_promise.has_value()) {
+ _promise.value().set_value(_fail_message);
}
}
void
-ApplyBucketDiffState::check()
+ApplyBucketDiffState::on_entry_complete(std::unique_ptr<Result> result, const document::DocumentId &doc_id, const char *op)
{
- wait();
- try {
- for (auto& result_to_check : _async_results) {
- result_to_check.check_result();
- }
- } catch (std::exception&) {
- _async_results.clear();
- throw;
+ if (result->hasError() && !_failed_flag.test_and_set()) {
+ vespalib::asciistream ss;
+ ss << "Failed " << op
+ << " for " << doc_id.toString()
+ << " in " << _bucket
+ << ": " << result->toString();
+ _fail_message = ss.str();
}
- _async_results.clear();
-}
-
-void
-ApplyBucketDiffState::push_back(ApplyBucketDiffEntryResult&& result)
-{
- _async_results.push_back(std::move(result));
}
void
@@ -76,4 +62,11 @@ ApplyBucketDiffState::sync_bucket_info()
}
}
+std::future<vespalib::string>
+ApplyBucketDiffState::get_future()
+{
+ _promise = std::promise<vespalib::string>();
+ return _promise.value().get_future();
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index 489e75e4a72..af4174b06d6 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -3,8 +3,13 @@
#pragma once
#include <vespa/persistence/spi/bucket.h>
+#include <future>
+#include <memory>
#include <vector>
+namespace document { class DocumentId; }
+namespace storage::spi { class Result; }
+
namespace storage {
class ApplyBucketDiffEntryResult;
@@ -15,19 +20,22 @@ class MergeBucketInfoSyncer;
* for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply.
*/
class ApplyBucketDiffState {
- std::vector<ApplyBucketDiffEntryResult> _async_results;
const MergeBucketInfoSyncer& _merge_bucket_info_syncer;
spi::Bucket _bucket;
+ vespalib::string _fail_message;
+ std::atomic_flag _failed_flag;
bool _stale_bucket_info;
+ std::optional<std::promise<vespalib::string>> _promise;
+
public:
ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket);
~ApplyBucketDiffState();
- void push_back(ApplyBucketDiffEntryResult&& result);
- bool empty() const;
+ void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
void check();
void mark_stale_bucket_info();
void sync_bucket_info();
+ std::future<vespalib::string> get_future();
};
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 274aba303fb..d2737089bba 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -3,7 +3,6 @@
#include "mergehandler.h"
#include "persistenceutil.h"
#include "apply_bucket_diff_entry_complete.h"
-#include "apply_bucket_diff_entry_result.h"
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
#include <vespa/persistence/spi/persistenceprovider.h>
@@ -96,6 +95,17 @@ struct DiffEntryTimestampPredicate {
}
};
+
+void check_apply_diff_sync(std::shared_ptr<ApplyBucketDiffState> async_results) {
+ auto future = async_results->get_future();
+ async_results.reset();
+ future.wait();
+ auto fail_message = future.get();
+ if (!fail_message.empty()) {
+ throw std::runtime_error(fail_message);
+ }
+}
+
} // anonymous namespace
void
@@ -483,27 +493,24 @@ MergeHandler::deserializeDiffDocument(
return doc;
}
-ApplyBucketDiffEntryResult
-MergeHandler::applyDiffEntry(const spi::Bucket& bucket,
+void
+MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results,
+ const spi::Bucket& bucket,
const api::ApplyBucketDiffCommand::Entry& e,
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
- std::promise<std::unique_ptr<spi::Result>> result_promise;
- auto future_result = result_promise.get_future();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
DocumentId docId = doc->getId();
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.put_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete));
- return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "put");
} else {
DocumentId docId(e._docName);
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
_spi.removeAsync(bucket, timestamp, docId, context, std::move(complete));
- return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "remove");
}
}
@@ -516,7 +523,7 @@ MergeHandler::applyDiffLocally(
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
spi::Context& context,
- ApplyBucketDiffState& async_results) const
+ std::shared_ptr<ApplyBucketDiffState> async_results) const
{
// Sort the data to apply by which file they should be added to
LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries",
@@ -527,7 +534,7 @@ MergeHandler::applyDiffLocally(
uint32_t addedCount = 0;
uint32_t notNeededByteCount = 0;
- async_results.mark_stale_bucket_info();
+ async_results->mark_stale_bucket_info();
std::vector<spi::DocEntry::UP> entries;
populateMetaData(bucket, MAX_TIMESTAMP, entries, context);
@@ -565,7 +572,7 @@ MergeHandler::applyDiffLocally(
++i;
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- async_results.push_back(applyDiffEntry(bucket, e, context, repo));
+ applyDiffEntry(async_results, bucket, e, context, repo);
} else {
assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp());
// Diffing for existing timestamp; should either both be put
@@ -578,7 +585,7 @@ MergeHandler::applyDiffLocally(
"timestamp in %s. Diff slot: %s. Existing slot: %s",
bucket.toString().c_str(), e.toString().c_str(),
existing.toString().c_str());
- async_results.push_back(applyDiffEntry(bucket, e, context, repo));
+ applyDiffEntry(async_results, bucket, e, context, repo);
} else {
// Duplicate put, just ignore it.
LOG(debug, "During diff apply, attempting to add slot "
@@ -610,7 +617,7 @@ MergeHandler::applyDiffLocally(
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- async_results.push_back(applyDiffEntry(bucket, e, context, repo));
+ applyDiffEntry(async_results, bucket, e, context, repo);
byteCount += e._headerBlob.size() + e._bodyBlob.size();
}
if (byteCount + notNeededByteCount != 0) {
@@ -1204,7 +1211,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
tracker->setMetric(_env._metrics.applyBucketDiff);
spi::Bucket bucket(cmd.getBucket());
- ApplyBucketDiffState async_results(*this, bucket);
+ auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
LOG(debug, "%s", cmd.toString().c_str());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
@@ -1229,8 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
- async_results.check();
- async_results.sync_bucket_info();
+ check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1289,7 +1295,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
(void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
- ApplyBucketDiffState async_results(*this, bucket);
+ auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
@@ -1325,8 +1331,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
framework::MilliSecTimer startTime(_clock);
applyDiffLocally(bucket, diff, index, s->context, async_results);
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
- async_results.check();
- async_results.sync_bucket_info();
+ check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 0ff8f3c0ef8..f6e8ddcf306 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -62,7 +62,7 @@ public:
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
spi::Context& context,
- ApplyBucketDiffState& async_results) const;
+ std::shared_ptr<ApplyBucketDiffState> async_results) const;
void sync_bucket_info(const spi::Bucket& bucket) const override;
MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
@@ -90,10 +90,11 @@ private:
* Invoke either put, remove or unrevertable remove on the SPI
* depending on the flags in the diff entry.
*/
- ApplyBucketDiffEntryResult applyDiffEntry(const spi::Bucket&,
- const api::ApplyBucketDiffCommand::Entry&,
- spi::Context& context,
- const document::DocumentTypeRepo& repo) const;
+ void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results,
+ const spi::Bucket&,
+ const api::ApplyBucketDiffCommand::Entry&,
+ spi::Context& context,
+ const document::DocumentTypeRepo& repo) const;
/**
* Fill entries-vector with metadata for bucket up to maxTimestamp,