summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2021-10-22 14:09:48 +0200
committerGitHub <noreply@github.com>2021-10-22 14:09:48 +0200
commit4eadb28ec148e758562c6627f730f3e4022bb097 (patch)
tree4d1fa06e02a7e0411049633227fb1136dcd9797d
parent2e24dcc2305544da1a3eb281a302ac15bddec9f2 (diff)
parent2cb33e26a4950b302fc67c148ebf76db168eeee6 (diff)
Merge pull request #19671 from vespa-engine/toregge/delay-replies-for-async-apply-bucket-diff
Delay replies for async apply bucket diff.
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp5
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp119
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp36
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h12
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp20
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.h5
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp64
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h8
8 files changed, 227 insertions, 42 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index fe77477fa77..d51485df38d 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -11,6 +11,8 @@
using document::DocumentId;
using document::test::makeDocumentBucket;
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
namespace storage {
@@ -72,6 +74,7 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test
public:
uint32_t sync_count;
DummyMergeBucketInfoSyncer syncer;
+ MonitoredRefCount monitored_ref_count;
ApplyBucketDiffStateTestBase()
: ::testing::Test(),
@@ -83,7 +86,7 @@ public:
~ApplyBucketDiffStateTestBase();
std::unique_ptr<ApplyBucketDiffState> make_state() {
- return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket));
+ return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
};
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 60030004594..017b8ce2b92 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -20,7 +20,12 @@ using namespace ::testing;
namespace storage {
-struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
+/*
+ * Class for testing merge handler taking async_apply_bucket_diff as
+ * parameter for the test.
+ */
+struct MergeHandlerTest : SingleDiskPersistenceTestUtils,
+ public testing::WithParamInterface<bool> {
uint32_t _location; // Location used for all merge tests
document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
@@ -149,8 +154,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
int _counter;
MessageSenderStub _stub;
std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd;
+ void convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler& handler);
};
+ void convert_delayed_error_to_exception(MergeHandler& handler);
+
std::string
doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -159,11 +167,21 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize);
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam());
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam());
+ }
+
+ std::shared_ptr<api::StorageMessage> get_queued_reply() {
+ std::shared_ptr<api::StorageMessage> msg;
+ if (_replySender.queue.getNext(msg, 0s)) {
+ return msg;
+ } else {
+ return {};
+ }
+
}
};
@@ -209,7 +227,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
-TEST_F(MergeHandlerTest, merge_bucket_command) {
+TEST_P(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -270,11 +288,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
EXPECT_EQ(17, diff.size());
}
-TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) {
+TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) {
testGetBucketDiffChain(true);
}
-TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) {
+TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) {
testGetBucketDiffChain(false);
}
@@ -320,17 +338,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_EQ(0, diff.size());
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) {
testApplyBucketDiffChain(true);
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
testApplyBucketDiffChain(false);
}
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
-TEST_F(MergeHandlerTest, master_message_flow) {
+TEST_P(MergeHandlerTest, master_message_flow) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -424,7 +442,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
}
-TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
+TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
uint32_t docSize = 1024;
uint32_t docCount = 10;
uint32_t maxChunkSize = docSize * 3;
@@ -488,7 +506,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
EXPECT_TRUE(reply->getResult().success());
}
-TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
+TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
setUpChain(FRONT);
uint32_t docSize = 1024;
@@ -524,7 +542,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize);
}
-TEST_F(MergeHandlerTest, max_timestamp) {
+TEST_P(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
MergeHandler handler = createHandler();
@@ -632,7 +650,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask
return getBucketDiffCmd;
}
-TEST_F(MergeHandlerTest, spi_flush_guard) {
+TEST_P(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
@@ -647,13 +665,16 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
try {
auto cmd = createDummyApplyDiff(6000);
handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
+ if (GetParam()) {
+ convert_delayed_error_to_exception(handler);
+ }
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
}
}
-TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
+TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
@@ -661,7 +682,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
-TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
+TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -684,7 +705,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
+TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
@@ -716,7 +737,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask);
}
-TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
+TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
@@ -741,6 +762,23 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
EXPECT_EQ(0x0, diff[0]._entry._hasMask);
}
+void
+MergeHandlerTest::convert_delayed_error_to_exception(MergeHandler& handler)
+{
+ handler.drain_async_writes();
+ if (getEnv()._fileStorHandler.isMerging(_bucket)) {
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ api::ReturnCode return_code;
+ s->check_delayed_error(return_code);
+ if (return_code.failed()) {
+ getEnv()._fileStorHandler.clearMergeStatus(_bucket, return_code);
+ fetchSingleMessage<api::ApplyBucketDiffReply>();
+ fetchSingleMessage<api::ApplyBucketDiffCommand>();
+ throw std::runtime_error(return_code.getMessage());
+ }
+ }
+}
+
std::string
MergeHandlerTest::doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -755,6 +793,9 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler,
providerWrapper.setFailureMask(failureMask);
try {
invoker.invoke(*this, handler, *_context);
+ if (GetParam()) {
+ convert_delayed_error_to_exception(handler);
+ }
if (failureMask != 0) {
return (std::string("No exception was thrown during handler "
"invocation. Expected exception containing '")
@@ -823,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
+TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -855,7 +896,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
+TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -888,7 +929,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -953,7 +994,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
+TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -1006,6 +1047,18 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
}
void
+MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler &handler)
+{
+ handler.drain_async_writes();
+ if (!_stub.replies.empty() && _stub.replies.back()->getResult().failed()) {
+ auto chained_reply = _stub.replies.back();
+ _stub.replies.pop_back();
+ test.messageKeeper().sendReply(chained_reply);
+ throw std::runtime_error(chained_reply->getResult().getMessage());
+ }
+}
+
+void
MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
@@ -1016,6 +1069,9 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
+ if (test.GetParam()) {
+ convert_delayed_error_to_exception(test, handler);
+ }
}
std::string
@@ -1039,7 +1095,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke(
}
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
HandleApplyBucketDiffReplyInvoker invoker;
for (int i = 0; i < 2; ++i) {
@@ -1066,7 +1122,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
}
}
-TEST_F(MergeHandlerTest, remove_from_diff) {
+TEST_P(MergeHandlerTest, remove_from_diff) {
framework::defaultimplementation::FakeClock clock;
MergeStatus status(clock, 0, 0);
@@ -1132,7 +1188,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
}
}
-TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
+TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
setUpChain(BACK);
document::TestDocMan docMan;
@@ -1156,8 +1212,15 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
- ASSERT_TRUE(applyBucketDiffReply.get());
+ if (GetParam()) {
+ ASSERT_FALSE(tracker);
+ handler.drain_async_writes();
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
+ ASSERT_TRUE(applyBucketDiffReply.get());
+ } else {
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
+ ASSERT_TRUE(applyBucketDiffReply.get());
+ }
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -1246,7 +1309,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en
}
-TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
using NodeList = decltype(_nodes);
// Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
@@ -1382,4 +1445,6 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
LOG(debug, "got mergebucket reply");
}
+VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true));
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index eb7a5ef5bc6..ad153c41aef 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -2,21 +2,27 @@
#include "apply_bucket_diff_state.h"
#include "mergehandler.h"
+#include "persistenceutil.h"
#include <vespa/document/base/documentid.h>
#include <vespa/persistence/spi/result.h>
#include <vespa/vespalib/stllike/asciistream.h>
using storage::spi::Result;
+using vespalib::RetainGuard;
namespace storage {
-ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket)
+ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
: _merge_bucket_info_syncer(merge_bucket_info_syncer),
_bucket(bucket),
_fail_message(),
_failed_flag(),
_stale_bucket_info(false),
- _promise()
+ _promise(),
+ _tracker(),
+ _delayed_reply(),
+ _sender(nullptr),
+ _retain_guard(std::move(retain_guard))
{
}
@@ -32,6 +38,17 @@ ApplyBucketDiffState::~ApplyBucketDiffState()
if (_promise.has_value()) {
_promise.value().set_value(_fail_message);
}
+ if (_delayed_reply) {
+ if (!_delayed_reply->getResult().failed() && !_fail_message.empty()) {
+ _delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message));
+ }
+ if (_sender) {
+ _sender->sendReply(std::move(_delayed_reply));
+ } else {
+ // _tracker->_reply and _delayed_reply points to the same reply.
+ _tracker->sendReply();
+ }
+ }
}
void
@@ -69,4 +86,19 @@ ApplyBucketDiffState::get_future()
return _promise.value().get_future();
}
+void
+ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply)
+{
+ _tracker = std::move(tracker);
+ _delayed_reply = std::move(delayed_reply);
+}
+
+void
+ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply)
+{
+ _tracker = std::move(tracker);
+ _sender = &sender;
+ _delayed_reply = std::move(delayed_reply);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index af4174b06d6..39f60156e66 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -3,16 +3,20 @@
#pragma once
#include <vespa/persistence/spi/bucket.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <future>
#include <memory>
#include <vector>
namespace document { class DocumentId; }
+namespace storage::api { class StorageReply; }
namespace storage::spi { class Result; }
namespace storage {
class ApplyBucketDiffEntryResult;
+class MessageSender;
+class MessageTracker;
class MergeBucketInfoSyncer;
/*
@@ -26,9 +30,13 @@ class ApplyBucketDiffState {
std::atomic_flag _failed_flag;
bool _stale_bucket_info;
std::optional<std::promise<vespalib::string>> _promise;
+ std::unique_ptr<MessageTracker> _tracker;
+ std::shared_ptr<api::StorageReply> _delayed_reply;
+ MessageSender* _sender;
+ vespalib::RetainGuard _retain_guard;
public:
- ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket);
+ ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
~ApplyBucketDiffState();
void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
@@ -36,6 +44,8 @@ public:
void mark_stale_bucket_info();
void sync_bucket_info();
std::future<vespalib::string> get_future();
+ void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
};
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
index febac5b87e5..1d9e0c6fae6 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
@@ -14,6 +14,7 @@ MergeStatus::MergeStatus(const framework::Clock& clock,
uint32_t traceLevel)
: reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0),
pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock),
+ delayed_error(),
context(priority, traceLevel)
{}
@@ -122,4 +123,23 @@ MergeStatus::print(std::ostream& out, bool verbose,
}
}
+void
+MergeStatus::set_delayed_error(std::future<vespalib::string>&& delayed_error_in)
+{
+ delayed_error = std::move(delayed_error_in);
+}
+
+void
+MergeStatus::check_delayed_error(api::ReturnCode &return_code)
+{
+ if (!return_code.failed() && delayed_error.has_value()) {
+ // Wait for pending writes to local node to complete and check error
+ auto& future_error = delayed_error.value();
+ future_error.wait();
+ vespalib::string fail_message = future_error.get();
+ delayed_error.reset();
+ return_code = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, std::move(fail_message));
+ }
+}
+
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
index b28ca4e373a..05ffd1336a2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
@@ -9,7 +9,9 @@
#include <vector>
#include <deque>
+#include <future>
#include <memory>
+#include <optional>
namespace storage {
@@ -25,6 +27,7 @@ public:
std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff;
vespalib::duration timeout;
framework::MilliSecTimer startTime;
+ std::optional<std::future<vespalib::string>> delayed_error;
spi::Context context;
MergeStatus(const framework::Clock&, api::StorageMessage::Priority, uint32_t traceLevel);
@@ -40,6 +43,8 @@ public:
bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes);
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
bool isFirstNode() const { return static_cast<bool>(reply); }
+ void set_delayed_error(std::future<vespalib::string>&& delayed_error_in);
+ void check_delayed_error(api::ReturnCode &return_code);
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 17a16487ac4..6d58966d415 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -17,6 +17,9 @@
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
+
namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
@@ -28,12 +31,19 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
+ _monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
_async_apply_bucket_diff(async_apply_bucket_diff)
{
}
+MergeHandler::~MergeHandler()
+{
+ drain_async_writes();
+}
+
+
namespace {
constexpr int getDeleteFlag() {
@@ -674,7 +684,8 @@ namespace {
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
- MessageSender& sender, spi::Context& context) const
+ MessageSender& sender, spi::Context& context,
+ std::shared_ptr<ApplyBucketDiffState>& async_results) const
{
// If last action failed, fail the whole merge
if (status.reply->getResult().failed()) {
@@ -806,6 +817,10 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
}
cmd->setPriority(status.context.getPriority());
cmd->setTimeout(status.timeout);
+ if (async_results) {
+ // Check currently pending writes to local node before sending new command.
+ check_apply_diff_sync(std::move(async_results));
+ }
if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) {
framework::MilliSecTimer startTime(_clock);
fetchLocalData(bucket, cmd->getDiff(), 0, context);
@@ -1171,7 +1186,8 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
reply.getDiff().begin(),
reply.getDiff().end());
- replyToSend = processBucketMerge(bucket, *s, sender, s->context);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results);
if (!replyToSend.get()) {
// We have sent something on, and shouldn't reply now.
@@ -1211,7 +1227,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
tracker->setMetric(_env._metrics.applyBucketDiff);
spi::Bucket bucket(cmd.getBucket());
- auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
LOG(debug, "%s", cmd.toString().c_str());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
@@ -1233,10 +1249,13 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
+ async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
+ if (!_async_apply_bucket_diff) {
+ check_apply_diff_sync(std::move(async_results));
+ }
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
- check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1260,10 +1279,14 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
}
- tracker->setReply(std::make_shared<api::ApplyBucketDiffReply>(cmd));
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd);
+ tracker->setReply(reply);
static_cast<api::ApplyBucketDiffReply&>(tracker->getReply()).getDiff().swap(cmd.getDiff());
LOG(spam, "Replying to ApplyBucketDiff for %s to node %d.",
bucket.toString().c_str(), cmd.getNodes()[index - 1].index);
+ if (async_results) {
+ async_results->set_delayed_reply(std::move(tracker), std::move(reply));
+ }
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
@@ -1280,6 +1303,10 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
cmd2->setPriority(cmd.getPriority());
cmd2->setTimeout(cmd.getTimeout());
s->pendingId = cmd2->getMsgId();
+ if (async_results) {
+ // Reply handler should check for delayed error.
+ s->set_delayed_error(async_results->get_future());
+ }
_env._fileStorHandler.sendCommand(cmd2);
// Everything went fine. Don't delete state but wait for reply
stateGuard.deactivate();
@@ -1290,12 +1317,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, MessageSender& sender, MessageTracker::UP tracker) const
{
- (void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
- auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
@@ -1316,6 +1342,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
api::StorageReply::SP replyToSend;
// Process apply bucket diff locally
api::ReturnCode returnCode = reply.getResult();
+ // Check for delayed error from handleApplyBucketDiff
+ s->check_delayed_error(returnCode);
try {
if (reply.getResult().failed()) {
LOG(debug, "Got failed apply bucket diff reply %s", reply.toString().c_str());
@@ -1329,9 +1357,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
+ async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
+ if (!_async_apply_bucket_diff) {
+ check_apply_diff_sync(std::move(async_results));
+ }
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
- check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
@@ -1370,7 +1401,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
// Should reply now, since we failed.
replyToSend = s->reply;
} else {
- replyToSend = processBucketMerge(bucket, *s, sender, s->context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results);
if (!replyToSend.get()) {
// We have sent something on and shouldn't reply now.
@@ -1392,6 +1423,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
throw;
}
+ if (async_results && replyToSend) {
+ replyToSend->setResult(returnCode);
+ async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend));
+ }
if (clearState) {
_env._fileStorHandler.clearMergeStatus(bucket.getBucket());
}
@@ -1402,4 +1437,13 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
}
+void
+MergeHandler::drain_async_writes()
+{
+ if (_monitored_ref_count) {
+ // Wait for related ApplyBucketDiffState objects to be destroyed
+ _monitored_ref_count->waitForZeroRefCount();
+ }
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f6e8ddcf306..cc06f769812 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -20,6 +20,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
namespace storage {
@@ -48,6 +49,8 @@ public:
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
+ ~MergeHandler();
+
bool buildBucketInfoList(
const spi::Bucket& bucket,
Timestamp maxTimestamp,
@@ -70,12 +73,14 @@ public:
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
+ void drain_async_writes();
private:
const framework::Clock &_clock;
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
+ std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
const bool _async_apply_bucket_diff;
@@ -84,7 +89,8 @@ private:
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
MessageSender& sender,
- spi::Context& context) const;
+ spi::Context& context,
+ std::shared_ptr<ApplyBucketDiffState>& async_results) const;
/**
* Invoke either put, remove or unrevertable remove on the SPI