diff options
Diffstat (limited to 'storage')
23 files changed, 150 insertions, 115 deletions
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d521975e0cb..a1263c9433b 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -220,8 +220,8 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) pending_message_tracker().insert(msg); } { - RemoveBucketOperation op(dummy_cluster_context, - BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); + // TODO we might not want this particular behavior for merge operations either + MergeOperation op(BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(2, 3))); // Not blocked for exact node match. EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq)); // But blocked for bucket match! diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp index e877f4601b7..971ff36c833 100644 --- a/storage/src/tests/distributor/removebucketoperationtest.cpp +++ b/storage/src/tests/distributor/removebucketoperationtest.cpp @@ -119,4 +119,16 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) { EXPECT_EQ("NONEXISTING", dumpBucket(document::BucketId(16, 1))); } +TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_target_node) { + RemoveBucketOperation op(dummy_cluster_context, + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + toVector<uint16_t>(1, 3))); + // In node target set + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 1, 120)); + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 3, 120)); + // Not in node target set + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 0, 120)); + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120)); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 25009156f18..f5531a134d0 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -38,14 +38,15 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { struct PendingMessage { uint32_t _msgType; + uint16_t _node; uint8_t _pri; - PendingMessage() : _msgType(UINT32_MAX), _pri(0) {} + constexpr PendingMessage() noexcept : _msgType(UINT32_MAX), _node(0), _pri(0) {} - PendingMessage(uint32_t msgType, uint8_t pri) - : _msgType(msgType), _pri(pri) {} + constexpr PendingMessage(uint32_t msgType, uint8_t pri) noexcept + : _msgType(msgType), _node(0), _pri(pri) {} - bool shouldCheck() const { return _msgType != UINT32_MAX; } + bool shouldCheck() const noexcept { return _msgType != UINT32_MAX; } }; void enableClusterState(const lib::ClusterState& systemState) { @@ -97,8 +98,7 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { IdealStateOperation::UP op(result.createOperation()); if (op.get()) { if (blocker.shouldCheck() - && op->shouldBlockThisOperation(blocker._msgType, - blocker._pri)) + && op->shouldBlockThisOperation(blocker._msgType, blocker._node, blocker._pri)) { return "BLOCKED"; } diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index a174d305c27..5f1acf6e7da 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -22,6 +22,15 @@ } \ } +#define CHECK_ERROR_ASYNC(className, failType, onError) \ + { \ + Guard guard(_lock); \ + if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ + onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \ + return; \ + } \ + } + namespace storage { namespace { @@ -172,13 +181,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, return _spi.destroyIterator(iterId, context); } -spi::Result -PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket, - spi::Context& context) +void +PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, + spi::OperationComplete::UP operationComplete) { LOG_SPI("deleteBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET); - return _spi.deleteBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); + _spi.deleteBucketAsync(bucket, context, std::move(operationComplete)); } spi::Result diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index d90fa7b2eaa..64828a2a3ee 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -103,7 +103,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index c51357cacd1..c3caac7121c 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -66,9 +66,9 @@ public: return PersistenceProviderWrapper::createBucket(bucket, ctx); } - spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { ++_deleteBucketInvocations; - return PersistenceProviderWrapper::deleteBucket(bucket, ctx); + PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index fbe1c142b09..7f66d1effd5 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -86,7 +86,7 @@ void GarbageCollectionOperation::update_gc_metrics() { } bool -GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const { +GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const { return true; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 2e010a61bde..f51739242b7 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -21,7 +21,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "garbagecollection"; }; Type getType() const override { return GARBAGE_COLLECTION; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index b1231fafcd9..744b24b593e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -138,8 +138,7 @@ public: bool check(uint32_t messageType, uint16_t node, uint8_t priority) override { - (void) node; - if (op.shouldBlockThisOperation(messageType, priority)) { + if (op.shouldBlockThisOperation(messageType, node, priority)) { blocked = true; return false; } @@ -232,6 +231,7 @@ IdealStateOperation::toString() const bool IdealStateOperation::shouldBlockThisOperation(uint32_t messageType, + [[maybe_unused]] uint16_t node, uint8_t) const { for (uint32_t i = 0; MAINTENANCE_MESSAGE_TYPES[i] != 0; ++i) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index d4dc4e405df..f8f35afe821 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -217,7 +217,7 @@ public: /** * Should return true if the given message type should block this operation. */ - virtual bool shouldBlockThisOperation(uint32_t messageType, uint8_t priority) const; + virtual bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t priority) const; protected: friend struct IdealStateManagerTest; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 7cfe4172b2c..f951a880e5d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -329,14 +329,14 @@ constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{ } -bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const { +bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const { for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) { if (messageType == blocking_type) { return true; } } - return IdealStateOperation::shouldBlockThisOperation(messageType, pri); + return IdealStateOperation::shouldBlockThisOperation(messageType, node, pri); } bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 1bca1f7389f..832c0f99681 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -48,7 +48,7 @@ public: const document::BucketId&, MergeLimiter&, std::vector<MergeMetaData>&); - bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override; + bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override; bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override; private: static void addIdealNodes( diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index 9a57722dc7e..25cae5b9979 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -61,8 +61,7 @@ RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender) bool RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg) { - api::DeleteBucketReply* rep = - dynamic_cast<api::DeleteBucketReply*>(msg.get()); + auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get()); uint16_t node = _tracker.handleReply(*rep); @@ -112,8 +111,15 @@ RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::sha } bool -RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint8_t) const +RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const { - return true; + // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded + // number in the worst case), so a simple linear scan suffices. + for (uint16_t node : getNodes()) { + if (target_node == node) { + return true; + } + } + return false; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h index a0d496f948a..5e0922d5685 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h @@ -30,7 +30,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "remove"; }; Type getType() const override { return DELETE_BUCKET; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 649503cf0f5..6f3924535ef 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -150,6 +150,7 @@ SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const Op bool SplitOperation::shouldBlockThisOperation(uint32_t msgType, + [[maybe_unused]] uint16_t node, uint8_t pri) const { if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h index ee957309088..6a268155fc8 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h @@ -21,7 +21,7 @@ public: const char* getName() const override { return "split"; }; Type getType() const override { return SPLIT_BUCKET; } bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override; - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index d2bfabd2950..d150f5600e5 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -91,7 +91,19 @@ private: vespalib::ISequencedTaskExecutor::ExecutorId _executorId; }; +bool +bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { + // Don't check document sizes, as background moving of documents in Proton + // may trigger a change in size without any mutations taking place. This will + // only take place when a document being moved was fed _prior_ to the change + // where Proton starts reporting actual document sizes, and will eventually + // converge to a stable value. But for now, ignore it to prevent false positive + // error logs and non-deleted buckets. + return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); +} + } + AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, BucketOwnershipNotifier &bucketOwnershipNotifier, vespalib::ISequencedTaskExecutor & executor, @@ -142,6 +154,47 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.deleteBuckets); + LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); + } + spi::Bucket bucket(cmd.getBucket()); + if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { + return tracker; + } + + auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); + if (entry.exist() && entry->getMetaCount() > 0) { + LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " + "active operation when delete bucket was queued. " + "Updating bucket database to keep it in sync with file. " + "Cannot delete bucket from bucket database at this " + "point, as it can have been intentionally recreated " + "after delete bucket had been sent", + bucket.getBucketId().toString().c_str()); + api::BucketInfo info(0, 0, 0); + // Only set document counts/size; retain ready/active state. + info.setReady(entry->getBucketInfo().isReady()); + info.setActive(entry->getBucketInfo().isActive()); + + entry->setBucketInfo(info); + entry.write(); + } + tracker->sendReply(); + }); + _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return tracker; +} + +MessageTracker::UP AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const { trackerUP->setMetric(_env._metrics.setBucketStates); @@ -154,9 +207,8 @@ AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrack auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket, notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable { if (tracker->checkForError(*response)) { - StorBucketDatabase::WrappedEntry - entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(), - "handleSetBucketState"); + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState"); if (entry.exist()) { entry->info.setActive(newState == spi::BucketInfo::ACTIVE); notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); @@ -273,4 +325,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra return true; } +bool +AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const +{ + spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); + if (result.hasError()) { + LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", + bucket.toString().c_str(), result.getErrorMessage().c_str()); + return false; + } + api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); + // Don't check meta fields or active/ready fields since these are not + // that important and ready may change under the hood in a race with + // getModifiedBuckets(). If bucket is empty it means it has already + // been deleted by a racing split/join. + if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { + LOG(error, + "Service layer bucket database and provider out of sync before " + "deleting bucket %s! Service layer db had %s while provider says " + "bucket has %s. Deletion has been rejected to ensure data is not " + "lost, but bucket may remain out of sync until service has been " + "restarted.", + bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); + return false; + } + return true; +} + } diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index bf37becb2c3..4f5c242570c 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -29,8 +29,10 @@ public: MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: + bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; static bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch = false) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 3d9b359f506..297185ac54c 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -45,7 +45,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::CREATEBUCKET_ID: return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ab5066576fd..73033132e5d 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -131,10 +131,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont return checkResult(_impl.createBucket(bucket, context)); } -spi::Result -ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.deleteBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 9361cd1d19d..6e7986ad65c 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -41,7 +41,7 @@ public: spi::Result initialize() override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; - void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; @@ -54,7 +54,6 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; spi::Result createBucket(const spi::Bucket&, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -67,6 +66,8 @@ public: void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; + void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override; private: template <typename ResultType> diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4c09f09e63..b4fe207e2e5 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa return document::FieldSet::SP(); } -bool -bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { - // Don't check document sizes, as background moving of documents in Proton - // may trigger a change in size without any mutations taking place. This will - // only take place when a document being moved was fed _prior_ to the change - // where Proton starts reporting actual document sizes, and will eventually - // converge to a stable value. But for now, ignore it to prevent false positive - // error logs and non-deleted buckets. - return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); -} } SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi) : _env(env), @@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT return tracker; } -bool -SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const -{ - spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); - if (result.hasError()) { - LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", - bucket.toString().c_str(), result.getErrorMessage().c_str()); - return false; - } - api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); - // Don't check meta fields or active/ready fields since these are not - // that important and ready may change under the hood in a race with - // getModifiedBuckets(). If bucket is empty it means it has already - // been deleted by a racing split/join. - if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { - LOG(error, - "Service layer bucket database and provider out of sync before " - "deleting bucket %s! Service layer db had %s while provider says " - "bucket has %s. Deletion has been rejected to ensure data is not " - "lost, but bucket may remain out of sync until service has been " - "restarted.", - bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); - return false; - } - return true; -} - -MessageTracker::UP -SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.deleteBuckets); - LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); - } - spi::Bucket bucket(cmd.getBucket()); - if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { - return tracker; - } - _spi.deleteBucket(bucket, tracker->context()); - StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); - { - StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); - if (entry.exist() && entry->getMetaCount() > 0) { - LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " - "active operation when delete bucket was queued. " - "Updating bucket database to keep it in sync with file. " - "Cannot delete bucket from bucket database at this " - "point, as it can have been intentionally recreated " - "after delete bucket had been sent", - cmd.getBucketId().toString().c_str()); - api::BucketInfo info(0, 0, 0); - // Only set document counts/size; retain ready/active state. - info.setReady(entry->getBucketInfo().isReady()); - info.setActive(entry->getBucketInfo().isActive()); - - entry->setBucketInfo(info); - entry.write(); - } - } - return tracker; -} - MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 9f00f67684d..2cfbc7016c0 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -23,13 +23,11 @@ public: MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const; private: - bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; }; |