summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-17 20:11:09 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-10-18 12:53:04 +0000
commitb2ad54a6e83481106835bd27cfdd0d7b60d839a0 (patch)
treec062ff1479e9d49398a2b61fc8596fcaa7d3fd43 /storage
parent42e8b5ed9678e76b33fb861c09a65fc8ef52bfa7 (diff)
Implement async delete bucket.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp19
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h2
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp83
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h5
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp74
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h2
10 files changed, 107 insertions, 93 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index a174d305c27..5f1acf6e7da 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -22,6 +22,15 @@
} \
}
+#define CHECK_ERROR_ASYNC(className, failType, onError) \
+ { \
+ Guard guard(_lock); \
+ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
+ onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
+ return; \
+ } \
+ }
+
namespace storage {
namespace {
@@ -172,13 +181,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
return _spi.destroyIterator(iterId, context);
}
-spi::Result
-PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket,
- spi::Context& context)
+void
+PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
+ spi::OperationComplete::UP operationComplete)
{
LOG_SPI("deleteBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET);
- return _spi.deleteBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
+ _spi.deleteBucketAsync(bucket, context, std::move(operationComplete));
}
spi::Result
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index d90fa7b2eaa..64828a2a3ee 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -103,7 +103,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index c51357cacd1..c3caac7121c 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -66,9 +66,9 @@ public:
return PersistenceProviderWrapper::createBucket(bucket, ctx);
}
- spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
++_deleteBucketInvocations;
- return PersistenceProviderWrapper::deleteBucket(bucket, ctx);
+ PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
};
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index d2bfabd2950..f2ed1922b60 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -91,7 +91,19 @@ private:
vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
};
+bool
+bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
+ // Don't check document sizes, as background moving of documents in Proton
+ // may trigger a change in size without any mutations taking place. This will
+ // only take place when a document being moved was fed _prior_ to the change
+ // where Proton starts reporting actual document sizes, and will eventually
+ // converge to a stable value. But for now, ignore it to prevent false positive
+ // error logs and non-deleted buckets.
+ return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
+}
+
}
+
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
BucketOwnershipNotifier &bucketOwnershipNotifier,
vespalib::ISequencedTaskExecutor & executor,
@@ -142,6 +154,45 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ) {
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
+ if (entry.exist() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ bucket.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+ tracker->sendReply();
+ });
+ _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
{
trackerUP->setMetric(_env._metrics.setBucketStates);
@@ -154,9 +205,8 @@ AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrack
auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket,
notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable {
if (tracker->checkForError(*response)) {
- StorBucketDatabase::WrappedEntry
- entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(),
- "handleSetBucketState");
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState");
if (entry.exist()) {
entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
@@ -273,4 +323,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra
return true;
}
+bool
+AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
+{
+ spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
+ if (result.hasError()) {
+ LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
+ bucket.toString().c_str(), result.getErrorMessage().c_str());
+ return false;
+ }
+ api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
+ // Don't check meta fields or active/ready fields since these are not
+ // that important and ready may change under the hood in a race with
+ // getModifiedBuckets(). If bucket is empty it means it has already
+ // been deleted by a racing split/join.
+ if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
+ LOG(error,
+ "Service layer bucket database and provider out of sync before "
+ "deleting bucket %s! Service layer db had %s while provider says "
+ "bucket has %s. Deletion has been rejected to ensure data is not "
+ "lost, but bucket may remain out of sync until service has been "
+ "restarted.",
+ bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
+ return false;
+ }
+ return true;
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index bf37becb2c3..4f5c242570c 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -29,8 +29,10 @@ public:
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch = false) const;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 3d9b359f506..297185ac54c 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -45,7 +45,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::CREATEBUCKET_ID:
return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index ab5066576fd..73033132e5d 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -131,10 +131,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont
return checkResult(_impl.createBucket(bucket, context));
}
-spi::Result
-ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.deleteBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 9361cd1d19d..6e7986ad65c 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -41,7 +41,7 @@ public:
spi::Result initialize() override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
- void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
+
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
@@ -54,7 +54,6 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -67,6 +66,8 @@ public:
void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
+ void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4c09f09e63..b4fe207e2e5 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa
return document::FieldSet::SP();
}
-bool
-bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
- // Don't check document sizes, as background moving of documents in Proton
- // may trigger a change in size without any mutations taking place. This will
- // only take place when a document being moved was fed _prior_ to the change
- // where Proton starts reporting actual document sizes, and will eventually
- // converge to a stable value. But for now, ignore it to prevent false positive
- // error logs and non-deleted buckets.
- return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
-}
}
SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi)
: _env(env),
@@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT
return tracker;
}
-bool
-SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
-{
- spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
- if (result.hasError()) {
- LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
- bucket.toString().c_str(), result.getErrorMessage().c_str());
- return false;
- }
- api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
- // Don't check meta fields or active/ready fields since these are not
- // that important and ready may change under the hood in a race with
- // getModifiedBuckets(). If bucket is empty it means it has already
- // been deleted by a racing split/join.
- if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
- LOG(error,
- "Service layer bucket database and provider out of sync before "
- "deleting bucket %s! Service layer db had %s while provider says "
- "bucket has %s. Deletion has been rejected to ensure data is not "
- "lost, but bucket may remain out of sync until service has been "
- "restarted.",
- bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
- return false;
- }
- return true;
-}
-
-MessageTracker::UP
-SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.deleteBuckets);
- LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
- }
- spi::Bucket bucket(cmd.getBucket());
- if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
- return tracker;
- }
- _spi.deleteBucket(bucket, tracker->context());
- StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
- {
- StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
- if (entry.exist() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- cmd.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
- }
- return tracker;
-}
-
MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 9f00f67684d..2cfbc7016c0 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -23,13 +23,11 @@ public:
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const;
private:
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
};