aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-11-02 11:27:48 +0100
committerTor Egge <Tor.Egge@broadpark.no>2020-11-02 11:27:48 +0100
commite6a1e21f1f7e3c54d28723af08b90aa0a76a8028 (patch)
treec88427ed33d29d1e74066df99231c3508e967121
parentac8b4ebae4796b275ff71cc15eb259a22797a913 (diff)
Apply diff entries using async spi methods during bucket merge.
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp66
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp35
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h30
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp38
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h30
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp45
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h9
10 files changed, 227 insertions, 31 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index f922689b941..971d7b8f410 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -2,6 +2,7 @@
vespa_add_executable(storage_persistence_gtest_runner_app TEST
SOURCES
+ apply_bucket_diff_entry_result_test.cpp
bucketownershipnotifiertest.cpp
has_mask_remapper_test.cpp
mergehandlertest.cpp
diff --git a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp
new file mode 100644
index 00000000000..1eb5f614950
--- /dev/null
+++ b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp
@@ -0,0 +1,66 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h>
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/persistence/spi/result.h>
+#include <gtest/gtest.h>
+
+using document::DocumentId;
+using document::test::makeDocumentBucket;
+
+namespace storage {
+
+using ResultVector = std::vector<ApplyBucketDiffEntryResult>;
+
+namespace {
+
+spi::Result spi_result_ok;
+spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked");
+document::BucketIdFactory bucket_id_factory;
+const char *test_op = "put";
+
+ApplyBucketDiffEntryResult
+make_result(spi::Result &spi_result, const DocumentId &doc_id)
+{
+ std::promise<std::unique_ptr<spi::Result>> result_promise;
+ result_promise.set_value(std::make_unique<spi::Result>(spi_result));
+ spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id)));
+ return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op);
+}
+
+void
+check_results(ResultVector results)
+{
+ for (auto& result : results) {
+ result.check_result();
+ }
+}
+
+}
+
+TEST(ApplyBucketDiffEntryResultTest, ok_results_can_be_checked)
+{
+ ResultVector results;
+ results.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
+ results.push_back(make_result(spi_result_ok, DocumentId("id::test::1")));
+ check_results(std::move(results));
+}
+
+TEST(ApplyBucketDiffEntryResultTest, first_failed_result_throws_exception)
+{
+ ResultVector results;
+ results.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
+ results.push_back(make_result(spi_result_fail, DocumentId("id::test::1")));
+ results.push_back(make_result(spi_result_fail, DocumentId("id::test::2")));
+ try {
+ check_results(std::move(results));
+ FAIL() << "Failed to throw exception for failed result";
+ } catch (std::exception &e) {
+ EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what()));
+ }
+}
+
+}
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 335863322d9..81f98136575 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -896,7 +896,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
ExpectedExceptionSpec exceptions[] = {
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
- { PersistenceProviderWrapper::FAIL_PUT, "Failed put" },
+ { PersistenceProviderWrapper::FAIL_PUT | PersistenceProviderWrapper::FAIL_REMOVE, "Failed put" },
{ PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" },
};
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index ff8d29f7f45..647d7fa1098 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -1,6 +1,8 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_spersistence OBJECT
SOURCES
+ apply_bucket_diff_entry_complete.cpp
+ apply_bucket_diff_entry_result.cpp
asynchandler.cpp
bucketownershipnotifier.cpp
bucketprocessor.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
new file mode 100644
index 00000000000..63700b30ea5
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -0,0 +1,35 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "apply_bucket_diff_entry_complete.h"
+#include <cassert>
+
+namespace storage {
+
+ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric)
+ : _result_handler(nullptr),
+ _result_promise(std::move(result_promise)),
+ _start_time(clock),
+ _latency_metric(latency_metric)
+{
+}
+
+ApplyBucketDiffEntryComplete::~ApplyBucketDiffEntryComplete() = default;
+
+void
+ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result)
+{
+ if (_result_handler != nullptr) {
+ _result_handler->handle(*result);
+ }
+ _result_promise.set_value(std::move(result));
+ _latency_metric.addValue(_start_time.getElapsedTimeAsDouble());
+}
+
+void
+ApplyBucketDiffEntryComplete::addResultHandler(const spi::ResultHandler* resultHandler)
+{
+ assert(_result_handler == nullptr);
+ _result_handler = resultHandler;
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
new file mode 100644
index 00000000000..f1e1d184665
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -0,0 +1,30 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/persistence/spi/operationcomplete.h>
+#include <vespa/storageframework/generic/clock/timer.h>
+#include <vespa/metrics/valuemetric.h>
+#include <future>
+
+namespace storage {
+
+/*
+ * Complete handler for a bucket diff entry spi operation (putAsync
+ * or removeAsync)
+ */
+class ApplyBucketDiffEntryComplete : public spi::OperationComplete
+{
+ using ResultPromise = std::promise<std::unique_ptr<spi::Result>>;
+ const spi::ResultHandler* _result_handler;
+ ResultPromise _result_promise;
+ framework::MilliSecTimer _start_time;
+ metrics::DoubleAverageMetric& _latency_metric;
+public:
+ ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric);
+ ~ApplyBucketDiffEntryComplete();
+ void onComplete(std::unique_ptr<spi::Result> result) override;
+ void addResultHandler(const spi::ResultHandler* resultHandler) override;
+};
+
+}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp
new file mode 100644
index 00000000000..8e81273e617
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp
@@ -0,0 +1,38 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "apply_bucket_diff_entry_result.h"
+#include <vespa/persistence/spi/result.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <cassert>
+
+namespace storage {
+
+ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op)
+ : _future_result(std::move(future_result)),
+ _bucket(bucket),
+ _doc_id(std::move(doc_id)),
+ _op(op)
+{
+}
+
+ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs) = default;
+
+ApplyBucketDiffEntryResult::~ApplyBucketDiffEntryResult() = default;
+
+void
+ApplyBucketDiffEntryResult::check_result()
+{
+ assert(_future_result.valid());
+ _future_result.wait();
+ auto result = _future_result.get();
+ if (result->hasError()) {
+ vespalib::asciistream ss;
+ ss << "Failed " << _op
+ << " for " << _doc_id.toString()
+ << " in " << _bucket
+ << ": " << result->toString();
+ throw std::runtime_error(ss.str());
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h
new file mode 100644
index 00000000000..f2308ef23a4
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h
@@ -0,0 +1,30 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/document/base/documentid.h>
+#include <vespa/persistence/spi/bucket.h>
+#include <future>
+
+namespace storage::spi { class Result; }
+
+namespace storage {
+
+/*
+ * Result of a bucket diff entry spi operation (putAsync or removeAsync)
+ */
+class ApplyBucketDiffEntryResult {
+ using FutureResult = std::future<std::unique_ptr<spi::Result>>;
+ FutureResult _future_result;
+ spi::Bucket _bucket;
+ document::DocumentId _doc_id;
+ const char* _op;
+
+public:
+ ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op);
+ ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs);
+ ~ApplyBucketDiffEntryResult();
+ void check_result();
+};
+
+}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 51b575548d8..7b3fb849f52 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -2,12 +2,15 @@
#include "mergehandler.h"
#include "persistenceutil.h"
+#include "apply_bucket_diff_entry_complete.h"
+#include "apply_bucket_diff_entry_result.h"
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
#include <algorithm>
+#include <future>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
@@ -38,22 +41,6 @@ constexpr int getDeleteFlag() {
* Throws std::runtime_error if result has an error.
*/
void
-checkResult(const spi::Result& result,
- const spi::Bucket& bucket,
- const document::DocumentId& docId,
- const char* op)
-{
- if (result.hasError()) {
- vespalib::asciistream ss;
- ss << "Failed " << op
- << " for " << docId.toString()
- << " in " << bucket
- << ": " << result.toString();
- throw std::runtime_error(ss.str());
- }
-}
-
-void
checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op)
{
if (result.hasError()) {
@@ -493,25 +480,27 @@ MergeHandler::deserializeDiffDocument(
return doc;
}
-void
+ApplyBucketDiffEntryResult
MergeHandler::applyDiffEntry(const spi::Bucket& bucket,
const api::ApplyBucketDiffCommand::Entry& e,
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
+ std::promise<std::unique_ptr<spi::Result>> result_promise;
+ auto future_result = result_promise.get_future();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
DocumentId docId = doc->getId();
- framework::MilliSecTimer start_time(_clock);
- checkResult(_spi.put(bucket, timestamp, std::move(doc), context), bucket, docId, "put");
- _env._metrics.merge_handler_metrics.put_latency.addValue(start_time.getElapsedTimeAsDouble());
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.put_latency);
+ _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete));
+ return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "put");
} else {
DocumentId docId(e._docName);
- framework::MilliSecTimer start_time(_clock);
- checkResult(_spi.remove(bucket, timestamp, docId, context), bucket, docId, "remove");
- _env._metrics.merge_handler_metrics.remove_latency.addValue(start_time.getElapsedTimeAsDouble());
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete));
+ return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "remove");
}
}
@@ -534,6 +523,7 @@ MergeHandler::applyDiffLocally(
uint32_t byteCount = 0;
uint32_t addedCount = 0;
uint32_t notNeededByteCount = 0;
+ std::vector<ApplyBucketDiffEntryResult> async_results;
std::vector<spi::DocEntry::UP> entries;
populateMetaData(bucket, MAX_TIMESTAMP, entries, context);
@@ -572,7 +562,7 @@ MergeHandler::applyDiffLocally(
++i;
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- applyDiffEntry(bucket, e, context, repo);
+ async_results.push_back(applyDiffEntry(bucket, e, context, repo));
} else {
assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp());
// Diffing for existing timestamp; should either both be put
@@ -585,7 +575,7 @@ MergeHandler::applyDiffLocally(
"timestamp in %s. Diff slot: %s. Existing slot: %s",
bucket.toString().c_str(), e.toString().c_str(),
existing.toString().c_str());
- applyDiffEntry(bucket, e, context, repo);
+ async_results.push_back(applyDiffEntry(bucket, e, context, repo));
} else {
// Duplicate put, just ignore it.
LOG(debug, "During diff apply, attempting to add slot "
@@ -617,9 +607,12 @@ MergeHandler::applyDiffLocally(
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- applyDiffEntry(bucket, e, context, repo);
+ async_results.push_back(applyDiffEntry(bucket, e, context, repo));
byteCount += e._headerBlob.size() + e._bodyBlob.size();
}
+ for (auto &result_to_check : async_results) {
+ result_to_check.check_result();
+ }
if (byteCount + notNeededByteCount != 0) {
_env._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue(
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 5e65e1a39ec..64b1448577a 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -24,6 +24,7 @@ namespace storage {
namespace spi { struct PersistenceProvider; }
class PersistenceUtil;
+class ApplyBucketDiffEntryResult;
class MergeHandler : public Types {
@@ -82,10 +83,10 @@ private:
* Invoke either put, remove or unrevertable remove on the SPI
* depending on the flags in the diff entry.
*/
- void applyDiffEntry(const spi::Bucket&,
- const api::ApplyBucketDiffCommand::Entry&,
- spi::Context& context,
- const document::DocumentTypeRepo& repo) const;
+ ApplyBucketDiffEntryResult applyDiffEntry(const spi::Bucket&,
+ const api::ApplyBucketDiffCommand::Entry&,
+ spi::Context& context,
+ const document::DocumentTypeRepo& repo) const;
/**
* Fill entries-vector with metadata for bucket up to maxTimestamp,