summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp5
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp12
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h4
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp6
-rw-r--r--storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp8
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp15
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h5
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp26
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp292
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h6
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h4
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h1
18 files changed, 231 insertions, 197 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index d51485df38d..701e8a80d3a 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -43,6 +43,7 @@ public:
throw std::runtime_error(_fail);
}
}
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override { }
void set_fail(vespalib::string fail) { _fail = std::move(fail); }
};
@@ -85,8 +86,8 @@ public:
~ApplyBucketDiffStateTestBase();
- std::unique_ptr<ApplyBucketDiffState> make_state() {
- return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
+ std::shared_ptr<ApplyBucketDiffState> make_state() {
+ return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
};
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index b3bd1c6a253..02b43a32df3 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -24,7 +24,7 @@
#define CHECK_ERROR_ASYNC(className, failType, onError) \
{ \
- Guard guard(_lock); \
+ Guard guard(_lock); \
if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
return; \
@@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const
return _spi.listBuckets(bucketSpace);
}
-spi::Result
-PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
LOG_SPI("createBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET);
- return _spi.createBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete);
+ return _spi.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketInfoResult
@@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
void
PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
- spi::OperationComplete::UP operationComplete)
+ spi::OperationComplete::UP operationComplete) noexcept
{
LOG_SPI("deleteBucket(" << bucket << ")");
CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index c6628814dba..cfc7002a643 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -96,7 +96,7 @@ public:
void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
spi::OperationComplete::UP up) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
@@ -111,7 +111,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 07d2b24d536..a3f0182ba30 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -62,13 +62,13 @@ public:
return PersistenceProviderWrapper::getBucketInfo(bucket);
}
- spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_createBucketInvocations;
- return PersistenceProviderWrapper::createBucket(bucket, ctx);
+ PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete));
}
void
- deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
+ deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_deleteBucketInvocations;
PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
diff --git a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
index ef71f0ae5f0..588b390cd5f 100644
--- a/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
+++ b/storage/src/tests/persistence/filestorage/sanitycheckeddeletetest.cpp
@@ -84,10 +84,10 @@ TEST_F(SanityCheckedDeleteTest, differing_document_sizes_not_considered_out_of_s
c.top.sendDown(delete_cmd);
c.top.waitForMessages(1, MSG_WAIT_TIME);
- // Bucket should now well and truly be gone. Will trigger a getBucketInfo error response.
- spi::BucketInfoResult info_post_delete(
- _node->getPersistenceProvider().getBucketInfo(spiBucket));
- ASSERT_TRUE(info_post_delete.hasError()) << info_post_delete.getErrorMessage();
+ auto reply = c.top.getAndRemoveMessage(api::MessageType::DELETEBUCKET_REPLY);
+ auto delete_reply = std::dynamic_pointer_cast<api::DeleteBucketReply>(reply);
+ ASSERT_TRUE(delete_reply);
+ ASSERT_TRUE(delete_reply->getResult().success());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 017b8ce2b92..ed50730d79f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -167,11 +167,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils,
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64, GetParam());
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64, GetParam());
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
@@ -872,7 +872,6 @@ TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
@@ -903,7 +902,6 @@ TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
@@ -1440,6 +1438,9 @@ TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
+ if (GetParam()) {
+ handler.drain_async_writes();
+ }
ASSERT_EQ(6u, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::MERGEBUCKET_REPLY, messageKeeper()._msgs[5]->getType());
LOG(debug, "got mergebucket reply");
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index ad153c41aef..556760b347e 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -12,6 +12,14 @@ using vespalib::RetainGuard;
namespace storage {
+class ApplyBucketDiffState::Deleter {
+public:
+ void operator()(ApplyBucketDiffState *raw_state) const noexcept {
+ std::unique_ptr<ApplyBucketDiffState> state(raw_state);
+ raw_state->_merge_bucket_info_syncer.schedule_delayed_delete(std::move(state));
+ }
+};
+
ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
: _merge_bucket_info_syncer(merge_bucket_info_syncer),
_bucket(bucket),
@@ -101,4 +109,11 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke
_delayed_reply = std::move(delayed_reply);
}
+std::shared_ptr<ApplyBucketDiffState>
+ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
+{
+ std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard)));
+ return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter());
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index 39f60156e66..7157c69191b 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -24,6 +24,7 @@ class MergeBucketInfoSyncer;
* for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply.
*/
class ApplyBucketDiffState {
+ class Deleter;
const MergeBucketInfoSyncer& _merge_bucket_info_syncer;
spi::Bucket _bucket;
vespalib::string _fail_message;
@@ -35,8 +36,9 @@ class ApplyBucketDiffState {
MessageSender* _sender;
vespalib::RetainGuard _retain_guard;
-public:
ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
+public:
+ static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
~ApplyBucketDiffState();
void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
@@ -46,6 +48,7 @@ public:
std::future<vespalib::string> get_future();
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ const spi::Bucket& get_bucket() const noexcept { return _bucket; }
};
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 47b5e4f5f27..bc6e67578c0 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -5,6 +5,7 @@
#include "testandsethelper.h"
#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
@@ -154,6 +155,31 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.createBuckets);
+ LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ tracker->sendReply();
+ });
+
+ if (cmd.getActive()) {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ } else {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ }
+
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.deleteBuckets);
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 4f5c242570c..db5a77bfb59 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -30,6 +30,7 @@ public:
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
index e05991ad9e3..b3386c591e6 100644
--- a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
+++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
@@ -6,6 +6,8 @@ namespace storage::spi { class Bucket; }
namespace storage {
+class ApplyBucketDiffState;
+
/*
* Interface class for syncing bucket info during merge.
*/
@@ -13,6 +15,7 @@ class MergeBucketInfoSyncer {
public:
virtual ~MergeBucketInfoSyncer() = default;
virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0;
+ virtual void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const = 0;
};
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 77e7762ec9a..c9ba43458b1 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -6,13 +6,13 @@
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <algorithm>
-#include <future>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
@@ -24,6 +24,7 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize,
uint32_t commonMergeChainOptimalizationMinimumSize,
bool async_apply_bucket_diff)
@@ -34,7 +35,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _async_apply_bucket_diff(async_apply_bucket_diff)
+ _async_apply_bucket_diff(async_apply_bucket_diff),
+ _executor(executor)
{
}
@@ -51,20 +53,6 @@ constexpr int getDeleteFlag() {
return 2;
}
-/**
- * Throws std::runtime_error if result has an error.
- */
-void
-checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op)
-{
- if (result.hasError()) {
- vespalib::asciistream ss;
- ss << "Failed " << op << " in " << bucket << ": " << result.toString();
- throw std::runtime_error(ss.str());
- }
-}
-
-
class IteratorGuard {
spi::PersistenceProvider& _spi;
spi::IteratorId _iteratorId;
@@ -663,25 +651,28 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
}
namespace {
- void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
- uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
- {
- for (const auto& entry : status.diff) {
- uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
- if ((entry_has_mask == 0u) ||
- (constrictHasMask && (entry_has_mask != hasMask))) {
- continue;
- }
- cmd.getDiff().emplace_back(entry);
- if (constrictHasMask) {
- cmd.getDiff().back()._entry._hasMask = newHasMask;
- } else {
- cmd.getDiff().back()._entry._hasMask = entry_has_mask;
- }
+
+void
+findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
+ uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
+{
+ for (const auto& entry : status.diff) {
+ uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
+ if ((entry_has_mask == 0u) ||
+ (constrictHasMask && (entry_has_mask != hasMask))) {
+ continue;
+ }
+ cmd.getDiff().emplace_back(entry);
+ if (constrictHasMask) {
+ cmd.getDiff().back()._entry._hasMask = newHasMask;
+ } else {
+ cmd.getDiff().back()._entry._hasMask = entry_has_mask;
}
}
}
+}
+
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
MessageSender& sender, spi::Context& context,
@@ -898,7 +889,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
tracker->fail(api::ReturnCode::BUSY, err);
return tracker;
}
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel());
@@ -938,141 +930,136 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
namespace {
- uint8_t findOwnIndex(
- const std::vector<api::MergeBucketCommand::Node>& nodeList,
- uint16_t us)
- {
- for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
- if (nodeList[i].index == us) return i;
- }
- throw vespalib::IllegalStateException(
- "Got GetBucketDiff cmd on node not in nodelist in command",
- VESPA_STRLOC);
+uint8_t findOwnIndex(
+ const std::vector<api::MergeBucketCommand::Node>& nodeList,
+ uint16_t us)
+{
+ for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
+ if (nodeList[i].index == us) return i;
}
+ throw vespalib::IllegalStateException(
+ "Got GetBucketDiff cmd on node not in nodelist in command",
+ VESPA_STRLOC);
+}
- struct DiffEntryTimestampOrder
- : public std::binary_function<api::GetBucketDiffCommand::Entry,
- api::GetBucketDiffCommand::Entry, bool>
- {
- bool operator()(const api::GetBucketDiffCommand::Entry& x,
- const api::GetBucketDiffCommand::Entry& y) const {
- return (x._timestamp < y._timestamp);
- }
- };
-
- /**
- * Merges list A and list B together and puts the result in result.
- * Result is swapped in as last step to keep function exception safe. Thus
- * result can be listA or listB if wanted.
- *
- * listA and listB are assumed to be in the order found in the slotfile, or
- * in the order given by a previous call to this function. (In both cases
- * this will be sorted by timestamp)
- *
- * @return false if any suspect entries was found.
- */
- bool mergeLists(
- const std::vector<api::GetBucketDiffCommand::Entry>& listA,
- const std::vector<api::GetBucketDiffCommand::Entry>& listB,
- std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
- {
- bool suspect = false;
- std::vector<api::GetBucketDiffCommand::Entry> result;
- uint32_t i = 0, j = 0;
- while (i < listA.size() && j < listB.size()) {
- const api::GetBucketDiffCommand::Entry& a(listA[i]);
- const api::GetBucketDiffCommand::Entry& b(listB[j]);
- if (a._timestamp < b._timestamp) {
- result.push_back(a);
- ++i;
- } else if (a._timestamp > b._timestamp) {
- result.push_back(b);
- ++j;
- } else {
- // If we find equal timestamped entries that are not the
- // same.. Flag an error. But there is nothing we can do
- // about it. Note it as if it is the same entry so we
- // dont try to merge them.
- if (!(a == b)) {
- if (a._gid == b._gid && a._flags == b._flags) {
- if ((a._flags & getDeleteFlag()) != 0 &&
- (b._flags & getDeleteFlag()) != 0)
- {
- // Unfortunately this can happen, for instance
- // if a remove comes to a bucket out of sync
- // and reuses different headers in the two
- // versions.
- LOG(debug, "Found entries with equal timestamps of "
- "the same gid who both are remove "
- "entries: %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- } else {
- LOG(error, "Found entries with equal timestamps of "
- "the same gid. This is likely same "
- "document where size of document varies:"
- " %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- }
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
- } else if ((a._flags & getDeleteFlag())
- != (b._flags & getDeleteFlag()))
+/**
+ * Merges list A and list B together and puts the result in result.
+ * Result is swapped in as last step to keep function exception safe. Thus
+ * result can be listA or listB if wanted.
+ *
+ * listA and listB are assumed to be in the order found in the slotfile, or
+ * in the order given by a previous call to this function. (In both cases
+ * this will be sorted by timestamp)
+ *
+ * @return false if any suspect entries was found.
+ */
+bool mergeLists(
+ const std::vector<api::GetBucketDiffCommand::Entry>& listA,
+ const std::vector<api::GetBucketDiffCommand::Entry>& listB,
+ std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
+{
+ bool suspect = false;
+ std::vector<api::GetBucketDiffCommand::Entry> result;
+ uint32_t i = 0, j = 0;
+ while (i < listA.size() && j < listB.size()) {
+ const api::GetBucketDiffCommand::Entry& a(listA[i]);
+ const api::GetBucketDiffCommand::Entry& b(listB[j]);
+ if (a._timestamp < b._timestamp) {
+ result.push_back(a);
+ ++i;
+ } else if (a._timestamp > b._timestamp) {
+ result.push_back(b);
+ ++j;
+ } else {
+ // If we find equal timestamped entries that are not the
+ // same.. Flag an error. But there is nothing we can do
+ // about it. Note it as if it is the same entry so we
+ // dont try to merge them.
+ if (!(a == b)) {
+ if (a._gid == b._gid && a._flags == b._flags) {
+ if ((a._flags & getDeleteFlag()) != 0 &&
+ (b._flags & getDeleteFlag()) != 0)
{
- // If we find one remove and one put entry on the
- // same timestamp we are going to keep the remove
- // entry to make the copies consistent.
- const api::GetBucketDiffCommand::Entry& deletedEntry(
- (a._flags & getDeleteFlag()) != 0 ? a : b);
- result.push_back(deletedEntry);
- LOG(debug,
- "Found put and remove on same timestamp. Keeping"
- "remove as it is likely caused by remove with "
- "copies unavailable at the time: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
+ // Unfortunately this can happen, for instance
+ // if a remove comes to a bucket out of sync
+ // and reuses different headers in the two
+ // versions.
+ LOG(debug, "Found entries with equal timestamps of "
+ "the same gid who both are remove "
+ "entries: %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
} else {
- LOG(error, "Found entries with equal timestamps that "
- "weren't the same entry: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
+ LOG(error, "Found entries with equal timestamps of "
+ "the same gid. This is likely same "
+ "document where size of document varies:"
+ " %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
}
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
+ suspect = true;
+ } else if ((a._flags & getDeleteFlag())
+ != (b._flags & getDeleteFlag()))
+ {
+ // If we find one remove and one put entry on the
+ // same timestamp we are going to keep the remove
+ // entry to make the copies consistent.
+ const api::GetBucketDiffCommand::Entry& deletedEntry(
+ (a._flags & getDeleteFlag()) != 0 ? a : b);
+ result.push_back(deletedEntry);
+ LOG(debug,
+ "Found put and remove on same timestamp. Keeping"
+ "remove as it is likely caused by remove with "
+ "copies unavailable at the time: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
} else {
+ LOG(error, "Found entries with equal timestamps that "
+ "weren't the same entry: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
result.push_back(a);
result.back()._hasMask |= b._hasMask;
+ suspect = true;
}
- ++i;
- ++j;
+ } else {
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
}
+ ++i;
+ ++j;
}
- if (i < listA.size()) {
- assert(j >= listB.size());
- for (uint32_t n = listA.size(); i<n; ++i) {
- result.push_back(listA[i]);
- }
- } else if (j < listB.size()) {
- assert(i >= listA.size());
- for (uint32_t n = listB.size(); j<n; ++j) {
- result.push_back(listB[j]);
- }
+ }
+ if (i < listA.size()) {
+ assert(j >= listB.size());
+ for (uint32_t n = listA.size(); i<n; ++i) {
+ result.push_back(listA[i]);
+ }
+ } else if (j < listB.size()) {
+ assert(i >= listA.size());
+ for (uint32_t n = listB.size(); j<n; ++j) {
+ result.push_back(listB[j]);
}
- result.swap(finalResult);
- return !suspect;
}
+ result.swap(finalResult);
+ return !suspect;
+}
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
-{
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const {
tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ return handleGetBucketDiffStage2(cmd, std::move(tracker));
+}
+MessageTracker::UP
+MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
+{
+ spi::Bucket bucket(cmd.getBucket());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket.");
return tracker;
@@ -1249,7 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
- async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
@@ -1357,7 +1344,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
- async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
@@ -1452,4 +1439,11 @@ MergeHandler::configure(bool async_apply_bucket_diff) noexcept
_async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release);
}
+void
+MergeHandler::schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState> state) const
+{
+ auto bucket_id = state->get_bucket().getBucketId();
+ _executor.execute(bucket_id.getId(), [state = std::move(state)]() { });
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 17cfb847d2c..4daec4c0689 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -22,6 +22,8 @@
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/util/monitored_refcount.h>
+namespace vespalib { class ISequencedTaskExecutor; }
+
namespace storage {
namespace spi {
@@ -45,6 +47,7 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
+ vespalib::ISequencedTaskExecutor& executor,
uint32_t maxChunkSize = 4190208,
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
@@ -67,6 +70,7 @@ public:
spi::Context& context,
std::shared_ptr<ApplyBucketDiffState> async_results) const;
void sync_bucket_info(const spi::Bucket& bucket) const override;
+ void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override;
MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
@@ -85,7 +89,9 @@ private:
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
std::atomic<bool> _async_apply_bucket_diff;
+ vespalib::ISequencedTaskExecutor& _executor;
+ MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index aa1a9c136fd..d03c9a6d111 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -17,7 +17,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
: _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, component.cluster_context(), _clock,
+ _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
cfg.bucketMergeChunkSize,
cfg.commonMergeChainOptimalizationMinimumSize,
cfg.asyncApplyBucketDiff),
@@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::REVERT_ID:
return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index ce424f0ce83..9ccd901744b 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context&
return checkResult(_impl.destroyIterator(iteratorId, context));
}
-spi::Result
-ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
- return checkResult(_impl.createBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
void
-ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
+ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
onComplete->addResultHandler(this);
- _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
+ _impl.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index c9d2411e372..14d20cf8a52 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -49,7 +49,6 @@ public:
spi::Context &context) override;
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -63,7 +62,8 @@ public:
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4fe207e2e5..9a7a451b906 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t
}
MessageTracker::UP
-SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.createBuckets);
- LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
- }
- spi::Bucket spiBucket(cmd.getBucket());
- _spi.createBucket(spiBucket, tracker->context());
- if (cmd.getActive()) {
- _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
- }
- return tracker;
-}
-
-MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.visit);
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 2cfbc7016c0..009fd6dff52 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -22,7 +22,6 @@ public:
SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&);
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;