diff options
Diffstat (limited to 'storage/src/vespa')
29 files changed, 225 insertions, 213 deletions
diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp index d3096e6864e..618e49c4238 100644 --- a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp @@ -39,13 +39,13 @@ MergeBucketMetricSet::MergeBucketMetricSet(const std::string& name, metrics::Met : OperationMetricSet(name, std::move(tags), description, owner), source_only_copy_changed("source_only_copy_changed", {{"logdefault"},{"yamasdefault"}}, - "The number of merge operations where source-only copy changed"), + "The number of merge operations where source-only copy changed", this), source_only_copy_delete_blocked("source_only_copy_delete_blocked", {{"logdefault"},{"yamasdefault"}}, - "The number of merge operations where delete of unchanged source-only copies was blocked"), + "The number of merge operations where delete of unchanged source-only copies was blocked", this), source_only_copy_delete_failed("source_only_copy_delete_failed", {{"logdefault"},{"yamasdefault"}}, - "The number of merge operations where delete of unchanged source-only copies failed") + "The number of merge operations where delete of unchanged source-only copies failed", this) { } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index fbe1c142b09..7f66d1effd5 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -86,7 +86,7 @@ void GarbageCollectionOperation::update_gc_metrics() { } bool -GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const { +GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const { return true; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 2e010a61bde..f51739242b7 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -21,7 +21,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "garbagecollection"; }; Type getType() const override { return GARBAGE_COLLECTION; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index b1231fafcd9..744b24b593e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -138,8 +138,7 @@ public: bool check(uint32_t messageType, uint16_t node, uint8_t priority) override { - (void) node; - if (op.shouldBlockThisOperation(messageType, priority)) { + if (op.shouldBlockThisOperation(messageType, node, priority)) { blocked = true; return false; } @@ -232,6 +231,7 @@ IdealStateOperation::toString() const bool IdealStateOperation::shouldBlockThisOperation(uint32_t messageType, + [[maybe_unused]] uint16_t node, uint8_t) const { for (uint32_t i = 0; MAINTENANCE_MESSAGE_TYPES[i] != 0; ++i) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index d4dc4e405df..f8f35afe821 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -217,7 +217,7 @@ public: /** * Should return true if the given message type should block this operation. */ - virtual bool shouldBlockThisOperation(uint32_t messageType, uint8_t priority) const; + virtual bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t priority) const; protected: friend struct IdealStateManagerTest; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 7cfe4172b2c..f951a880e5d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -329,14 +329,14 @@ constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{ } -bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const { +bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const { for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) { if (messageType == blocking_type) { return true; } } - return IdealStateOperation::shouldBlockThisOperation(messageType, pri); + return IdealStateOperation::shouldBlockThisOperation(messageType, node, pri); } bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 1bca1f7389f..832c0f99681 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -48,7 +48,7 @@ public: const document::BucketId&, MergeLimiter&, std::vector<MergeMetaData>&); - bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override; + bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override; bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override; private: static void addIdealNodes( diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index 9a57722dc7e..25cae5b9979 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -61,8 +61,7 @@ RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender) bool RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg) { - api::DeleteBucketReply* rep = - dynamic_cast<api::DeleteBucketReply*>(msg.get()); + auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get()); uint16_t node = _tracker.handleReply(*rep); @@ -112,8 +111,15 @@ RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::sha } bool -RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint8_t) const +RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const { - return true; + // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded + // number in the worst case), so a simple linear scan suffices. + for (uint16_t node : getNodes()) { + if (target_node == node) { + return true; + } + } + return false; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h index a0d496f948a..5e0922d5685 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h @@ -30,7 +30,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "remove"; }; Type getType() const override { return DELETE_BUCKET; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 649503cf0f5..6f3924535ef 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -150,6 +150,7 @@ SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const Op bool SplitOperation::shouldBlockThisOperation(uint32_t msgType, + [[maybe_unused]] uint16_t node, uint8_t pri) const { if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h index ee957309088..6a268155fc8 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h @@ -21,7 +21,7 @@ public: const char* getName() const override { return "split"; }; Type getType() const override { return SPLIT_BUCKET; } bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override; - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 4a28e650fac..d150f5600e5 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -3,11 +3,16 @@ #include "asynchandler.h" #include "persistenceutil.h" #include "testandsethelper.h" +#include "bucketownershipnotifier.h" #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/log/log.h> +LOG_SETUP(".storage.persistence.asynchandler"); + namespace storage { namespace { @@ -86,12 +91,26 @@ private: vespalib::ISequencedTaskExecutor::ExecutorId _executorId; }; +bool +bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { + // Don't check document sizes, as background moving of documents in Proton + // may trigger a change in size without any mutations taking place. This will + // only take place when a document being moved was fed _prior_ to the change + // where Proton starts reporting actual document sizes, and will eventually + // converge to a stable value. But for now, ignore it to prevent false positive + // error logs and non-deleted buckets. + return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); } + +} + AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, + BucketOwnershipNotifier &bucketOwnershipNotifier, vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory) : _env(env), _spi(spi), + _bucketOwnershipNotifier(bucketOwnershipNotifier), _sequencedExecutor(executor), _bucketIdFactory(bucketIdFactory) {} @@ -135,6 +154,79 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.deleteBuckets); + LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); + } + spi::Bucket bucket(cmd.getBucket()); + if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { + return tracker; + } + + auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); + if (entry.exist() && entry->getMetaCount() > 0) { + LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " + "active operation when delete bucket was queued. " + "Updating bucket database to keep it in sync with file. " + "Cannot delete bucket from bucket database at this " + "point, as it can have been intentionally recreated " + "after delete bucket had been sent", + bucket.getBucketId().toString().c_str()); + api::BucketInfo info(0, 0, 0); + // Only set document counts/size; retain ready/active state. + info.setReady(entry->getBucketInfo().isReady()); + info.setActive(entry->getBucketInfo().isActive()); + + entry->setBucketInfo(info); + entry.write(); + } + tracker->sendReply(); + }); + _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return tracker; +} + +MessageTracker::UP +AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const +{ + trackerUP->setMetric(_env._metrics.setBucketStates); + + //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); + spi::Bucket bucket(cmd.getBucket()); + bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); + spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); + + auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket, + notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable { + if (tracker->checkForError(*response)) { + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState"); + if (entry.exist()) { + entry->info.setActive(newState == spi::BucketInfo::ACTIVE); + notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); + entry.write(); + } else { + LOG(warning, "Got OK setCurrentState result from provider for %s, " + "but bucket has disappeared from service layer database", + cmd.getBucketId().toString().c_str()); + } + tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); + } + tracker->sendReply(); + }); + _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return trackerUP; +} + +MessageTracker::UP AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const { MessageTracker & tracker = *trackerUP; @@ -233,4 +325,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra return true; } +bool +AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const +{ + spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); + if (result.hasError()) { + LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", + bucket.toString().c_str(), result.getErrorMessage().c_str()); + return false; + } + api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); + // Don't check meta fields or active/ready fields since these are not + // that important and ready may change under the hood in a race with + // getModifiedBuckets(). If bucket is empty it means it has already + // been deleted by a racing split/join. + if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { + LOG(error, + "Service layer bucket database and provider out of sync before " + "deleting bucket %s! Service layer db had %s while provider says " + "bucket has %s. Deletion has been rejected to ensure data is not " + "lost, but bucket may remain out of sync until service has been " + "restarted.", + bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); + return false; + } + return true; +} + } diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 23f3605dca1..4f5c242570c 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -14,6 +14,7 @@ namespace spi { class Context; } class PersistenceUtil; +class BucketOwnershipNotifier; /** * Handle async operations that uses a sequenced executor. @@ -21,19 +22,23 @@ class PersistenceUtil; */ class AsyncHandler : public Types { public: - AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor, - const document::BucketIdFactory & bucketIdFactory); + AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &, + vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory); MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: + bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; static bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch = false) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; + BucketOwnershipNotifier & _bucketOwnershipNotifier; vespalib::ISequencedTaskExecutor & _sequencedExecutor; const document::BucketIdFactory & _bucketIdFactory; }; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 0dcb8539bff..70ed9845cb0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -200,13 +200,13 @@ public: virtual void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) = 0; /** - * Returns the reference to the current merge status for the given bucket. + * Returns a shared pointer to the current merge status for the given bucket. * This allows unlocked access to an internal variable, so users should * first check that noone else is using it by calling isMerging() first. * * @param bucket The bucket to start merging. */ - virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0; + virtual std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket& bucket) = 0; /** * Returns true if the bucket is currently being merged on this node. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 62f9fa12a21..e395a7df9e0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -86,7 +86,7 @@ FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, std::shared_ _mergeStates[bucket] = status; } -MergeStatus& +std::shared_ptr<MergeStatus> FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) { std::lock_guard mlock(_mergeStatesLock); @@ -94,7 +94,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) if ( ! status ) { throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } - return *status; + return status; } bool diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index a3dc316cdde..5f212b18a7f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -218,7 +218,7 @@ public: } void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) override; - MergeStatus& editMergeStatus(const document::Bucket&) override; + std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket&) override; bool isMerging(const document::Bucket&) const override; void clearMergeStatus(const document::Bucket& bucket) override; void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index ddcab0f3659..2ffb827accf 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -10,7 +10,6 @@ #include <vespa/vdslib/state/clusterstate.h> #include <vespa/storage/common/hostreporter/hostinfo.h> #include <vespa/storage/common/messagebucket.h> -#include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/persistence/persistencehandler.h> diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7693156ae30..963fddd9fb5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -23,13 +23,15 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize) + uint32_t commonMergeChainOptimalizationMinimumSize, + bool async_apply_bucket_diff) : _clock(clock), _cluster_context(cluster_context), _env(env), _spi(spi), _maxChunkSize(maxChunkSize), - _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize) + _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), + _async_apply_bucket_diff(async_apply_bucket_diff) { } @@ -1136,49 +1138,49 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got GetBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } api::StorageReply::SP replyToSend; bool clearState = true; try { - if (s.isFirstNode()) { + if (s->isFirstNode()) { if (reply.getResult().failed()) { // We failed, so we should reply to the pending message. - replyToSend = s.reply; + replyToSend = s->reply; } else { // If we didn't fail, reply should have good content // Sanity check for nodes assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s.diff.size() == 0); - s.diff.insert(s.diff.end(), + assert(s->diff.size() == 0); + s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on, and shouldn't reply now. clearState = false; } else { _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue( - s.startTime.getElapsedTimeAsDouble()); + s->startTime.getElapsedTimeAsDouble()); } } } else { // Exists in send on list, send on! - replyToSend = s.pendingGetDiff; + replyToSend = s->pendingGetDiff; LOG(spam, "Received GetBucketDiffReply for %s with diff of " "size %zu. Sending it on.", bucket.toString().c_str(), reply.getDiff().size()); - s.pendingGetDiff->getDiff().swap(reply.getDiff()); + s->pendingGetDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( @@ -1282,8 +1284,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const { + (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); ApplyBucketDiffState async_results(*this, bucket); @@ -1296,11 +1299,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); - if (s.pendingId != reply.getMsgId()) { + auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); + if (s->pendingId != reply.getMsgId()) { LOG(warning, "Got ApplyBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", - bucket.toString().c_str(), reply.getMsgId(), s.pendingId); + bucket.toString().c_str(), reply.getMsgId(), s->pendingId); return; } bool clearState = true; @@ -1315,12 +1318,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex); if (applyDiffNeedLocalData(diff, index, false)) { framework::MilliSecTimer startTime(_clock); - fetchLocalData(bucket, diff, index, s.context); + fetchLocalData(bucket, diff, index, s->context); _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - applyDiffLocally(bucket, diff, index, s.context, async_results); + applyDiffLocally(bucket, diff, index, s->context, async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); async_results.check(); async_results.sync_bucket_info(); @@ -1332,50 +1335,50 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } } - if (s.isFirstNode()) { + if (s->isFirstNode()) { uint16_t hasMask = 0; for (uint16_t i=0; i<reply.getNodes().size(); ++i) { hasMask |= (1 << i); } - const size_t diffSizeBefore = s.diff.size(); - const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes()); + const size_t diffSizeBefore = s->diff.size(); + const bool altered = s->removeFromDiff(diff, hasMask, reply.getNodes()); if (reply.getResult().success() - && s.diff.size() == diffSizeBefore + && s->diff.size() == diffSizeBefore && !altered) { std::string msg( vespalib::make_string( "Completed merge cycle without fixing " "any entries (merge state diff at %zu entries)", - s.diff.size())); + s->diff.size())); returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, msg); LOG(warning, "Got reply indicating merge cycle did not fix any entries: %s", reply.toString(true).c_str()); LOG(warning, "Merge state for which there was no progress across a full merge cycle: %s", - s.toString().c_str()); + s->toString().c_str()); } if (returnCode.failed()) { // Should reply now, since we failed. - replyToSend = s.reply; + replyToSend = s->reply; } else { - replyToSend = processBucketMerge(bucket, s, sender, s.context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context); if (!replyToSend.get()) { // We have sent something on and shouldn't reply now. clearState = false; } else { - _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s.startTime.getElapsedTimeAsDouble()); + _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble()); } } } else { - replyToSend = s.pendingApplyDiff; + replyToSend = s->pendingApplyDiff; LOG(debug, "ApplyBucketDiff(%s) finished. Sending reply.", bucket.toString().c_str()); - s.pendingApplyDiff->getDiff().swap(reply.getDiff()); + s->pendingApplyDiff->getDiff().swap(reply.getDiff()); } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index fa7e21dae78..0ff8f3c0ef8 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -45,7 +45,8 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, uint32_t maxChunkSize = 4190208, - uint32_t commonMergeChainOptimalizationMinimumSize = 64); + uint32_t commonMergeChainOptimalizationMinimumSize = 64, + bool async_apply_bucket_diff = false); bool buildBucketInfoList( const spi::Bucket& bucket, @@ -68,7 +69,7 @@ public: MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const; MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; - void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const; + void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; private: const framework::Clock &_clock; @@ -77,6 +78,7 @@ private: spi::PersistenceProvider &_spi; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; + const bool _async_apply_bucket_diff; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 5315d3ec0bc..1ef883fc810 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -19,8 +19,9 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen _processAllHandler(_env, provider), _mergeHandler(_env, provider, component.cluster_context(), _clock, cfg.bucketMergeChunkSize, - cfg.commonMergeChainOptimalizationMinimumSize), - _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()), + cfg.commonMergeChainOptimalizationMinimumSize, + cfg.asyncApplyBucketDiff), + _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider) { @@ -45,7 +46,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::CREATEBUCKET_ID: return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: @@ -62,7 +63,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::APPLYBUCKETDIFF_ID: return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::SETBUCKETSTATE_ID: - return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: @@ -87,19 +88,20 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr return MessageTracker::UP(); } -void -PersistenceHandler::handleReply(api::StorageReply& reply) const +MessageTracker::UP +PersistenceHandler::handleReply(api::StorageReply& reply, MessageTracker::UP tracker) const { switch (reply.getType().getId()) { case api::MessageType::GETBUCKETDIFF_REPLY_ID: _mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler); break; case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: - _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler); + _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler, std::move(tracker)); break; default: break; } + return tracker; } MessageTracker::UP @@ -112,7 +114,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP try{ LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - handleReply(static_cast<api::StorageReply&>(msg)); + return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker)); } catch (std::exception& e) { // It's a reply, so nothing we can do. LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index a800d1d4053..a92c2dc78ca 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -38,7 +38,7 @@ public: private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; - void handleReply(api::StorageReply&) const; + MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const; MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const; diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 471d3d62a35..ce424f0ce83 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste return checkResult(_impl.setClusterState(bucketSpace, state)); } -spi::Result -ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) +void +ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.setActiveState(bucket, newState)); + onComplete->addResultHandler(this); + _impl.setActiveStateAsync(bucket, newState, std::move(onComplete)); } spi::BucketInfoResult @@ -72,32 +73,6 @@ ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const return checkResult(_impl.getBucketInfo(bucket)); } -spi::Result -ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::DocumentSP doc, spi::Context& context) -{ - return checkResult(_impl.put(bucket, ts, std::move(doc), context)); -} - -spi::RemoveResult -ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) -{ - return checkResult(_impl.remove(bucket, ts, docId, context)); -} - -spi::RemoveResult -ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts, - const document::DocumentId& docId, spi::Context& context) -{ - return checkResult(_impl.removeIfFound(bucket, ts, docId, context)); -} - -spi::UpdateResult -ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts, - spi::DocumentUpdateSP docUpdate, spi::Context& context) -{ - return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context)); -} - spi::GetResult ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet, const document::DocumentId& docId, spi::Context& context) const @@ -130,10 +105,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont return checkResult(_impl.createBucket(bucket, context)); } -spi::Result -ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.deleteBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index d23cce9172a..c9d2411e372 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -41,12 +41,8 @@ public: spi::Result initialize() override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; - spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override; + spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; - spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; - spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; - spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket &bucket, FieldSetSP, const spi::Selection &, spi::IncludedVersions versions, @@ -54,7 +50,6 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; spi::Result createBucket(const spi::Bucket&, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -67,6 +62,8 @@ public: void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; + void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override; private: template <typename ResultType> diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4c09f09e63..b4fe207e2e5 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa return document::FieldSet::SP(); } -bool -bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { - // Don't check document sizes, as background moving of documents in Proton - // may trigger a change in size without any mutations taking place. This will - // only take place when a document being moved was fed _prior_ to the change - // where Proton starts reporting actual document sizes, and will eventually - // converge to a stable value. But for now, ignore it to prevent false positive - // error logs and non-deleted buckets. - return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); -} } SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi) : _env(env), @@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT return tracker; } -bool -SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const -{ - spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); - if (result.hasError()) { - LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", - bucket.toString().c_str(), result.getErrorMessage().c_str()); - return false; - } - api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); - // Don't check meta fields or active/ready fields since these are not - // that important and ready may change under the hood in a race with - // getModifiedBuckets(). If bucket is empty it means it has already - // been deleted by a racing split/join. - if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { - LOG(error, - "Service layer bucket database and provider out of sync before " - "deleting bucket %s! Service layer db had %s while provider says " - "bucket has %s. Deletion has been rejected to ensure data is not " - "lost, but bucket may remain out of sync until service has been " - "restarted.", - bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); - return false; - } - return true; -} - -MessageTracker::UP -SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.deleteBuckets); - LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); - } - spi::Bucket bucket(cmd.getBucket()); - if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { - return tracker; - } - _spi.deleteBucket(bucket, tracker->context()); - StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); - { - StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); - if (entry.exist() && entry->getMetaCount() > 0) { - LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " - "active operation when delete bucket was queued. " - "Updating bucket database to keep it in sync with file. " - "Cannot delete bucket from bucket database at this " - "point, as it can have been intentionally recreated " - "after delete bucket had been sent", - cmd.getBucketId().toString().c_str()); - api::BucketInfo info(0, 0, 0); - // Only set document counts/size; retain ready/active state. - info.setReady(entry->getBucketInfo().isReady()); - info.setActive(entry->getBucketInfo().isActive()); - - entry->setBucketInfo(info); - entry.write(); - } - } - return tracker; -} - MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 9f00f67684d..2cfbc7016c0 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -23,13 +23,11 @@ public: MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const; private: - bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; }; diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp index 0856f45c3ff..d5b44cc1911 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -5,7 +5,6 @@ #include "bucketownershipnotifier.h" #include "splitbitdetector.h" #include "messages.h" -#include <vespa/storage/common/bucketmessages.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storageapi/message/bucket.h> @@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker } MessageTracker::UP -SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.setBucketStates); - NotificationGuard notifyGuard(_bucketOwnershipNotifier); - - LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); - spi::Bucket bucket(cmd.getBucket()); - bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); - spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); - - spi::Result result(_spi.setActiveState(bucket, newState)); - if (tracker->checkForError(result)) { - StorBucketDatabase::WrappedEntry - entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState"); - if (entry.exist()) { - entry->info.setActive(newState == spi::BucketInfo::ACTIVE); - notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); - entry.write(); - } else { - LOG(warning, "Got OK setCurrentState result from provider for %s, " - "but bucket has disappeared from service layer database", - cmd.getBucketId().toString().c_str()); - } - - tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); - } - - return tracker; -} - -MessageTracker::UP SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.recheckBucketInfo); diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h index ddfa22b154c..4521e520ee9 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.h +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h @@ -21,7 +21,6 @@ public: SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &, BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization); MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const; private: diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index fa9ab22c1cb..a17c77f6ca4 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -72,6 +72,7 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) : metrics::MetricSet("mergethrottler", {}, "", owner), averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this), queueSize("queuesize", {}, "Length of merge queue", this), + active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this), bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this), chaining("mergechains", this), local("locallyexecutedmerges", this) @@ -366,6 +367,7 @@ MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter) LOG(debug, "Removed merge for %s from internal state", mergeIter->first.toString().c_str()); _merges.erase(mergeIter); + update_active_merge_window_size_metric(); } api::StorageMessage::SP @@ -815,6 +817,7 @@ MergeThrottler::processNewMergeCommand( // merge throttling window. assert(_merges.find(mergeCmd.getBucket()) == _merges.end()); auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first; + update_active_merge_window_size_metric(); LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str()); @@ -1247,6 +1250,11 @@ MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) } void +MergeThrottler::update_active_merge_window_size_metric() noexcept { + _metrics->active_window_size.set(static_cast<int64_t>(_merges.size())); +} + +void MergeThrottler::print(std::ostream& out, bool /*verbose*/, const std::string& /*indent*/) const { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 9b0fb125b2f..997477a4b70 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -64,6 +64,7 @@ public: public: metrics::DoubleAverageMetric averageQueueWaitingTime; metrics::LongValueMetric queueSize; + metrics::LongValueMetric active_window_size; metrics::LongCountMetric bounced_due_to_back_pressure; MergeOperationMetrics chaining; MergeOperationMetrics local; @@ -388,6 +389,8 @@ private: void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion); void markActiveMergesAsAborted(uint32_t minimumStateVersion); + void update_active_merge_window_size_metric() noexcept; + // const function, but metrics are mutable void updateOperationMetrics( const api::ReturnCode& result, |