diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-18 14:45:28 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-18 14:45:28 +0200 |
commit | 2e6819080e60f8128202c1f81df99e9ce6b60fc8 (patch) | |
tree | 9c1010aa461dbfaee15d708bdc63dd729b072721 /storage | |
parent | d3cce908ce4683ee35ee50ae6a8120ea3bc13045 (diff) | |
parent | d0c01828bf846d555ba648c342e58f2aa71b6cc7 (diff) |
Merge pull request #19598 from vespa-engine/balder/async-set-active-state
Make setActiveState async.
Diffstat (limited to 'storage')
7 files changed, 52 insertions, 41 deletions
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 4a28e650fac..d2bfabd2950 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -3,11 +3,16 @@ #include "asynchandler.h" #include "persistenceutil.h" #include "testandsethelper.h" +#include "bucketownershipnotifier.h" #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/log/log.h> +LOG_SETUP(".storage.persistence.asynchandler"); + namespace storage { namespace { @@ -88,10 +93,12 @@ private: } AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, + BucketOwnershipNotifier &bucketOwnershipNotifier, vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory) : _env(env), _spi(spi), + _bucketOwnershipNotifier(bucketOwnershipNotifier), _sequencedExecutor(executor), _bucketIdFactory(bucketIdFactory) {} @@ -135,6 +142,39 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const +{ + trackerUP->setMetric(_env._metrics.setBucketStates); + + //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); + spi::Bucket bucket(cmd.getBucket()); + bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); + spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); + + auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket, + notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable { + if (tracker->checkForError(*response)) { + StorBucketDatabase::WrappedEntry + entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(), + "handleSetBucketState"); + if (entry.exist()) { + entry->info.setActive(newState == spi::BucketInfo::ACTIVE); + notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); + entry.write(); + } else { + LOG(warning, "Got OK setCurrentState result from provider for %s, " + "but bucket has disappeared from service layer database", + cmd.getBucketId().toString().c_str()); + } + tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); + } + tracker->sendReply(); + }); + _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return trackerUP; +} + +MessageTracker::UP AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const { MessageTracker & tracker = *trackerUP; diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 23f3605dca1..bf37becb2c3 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -14,6 +14,7 @@ namespace spi { class Context; } class PersistenceUtil; +class BucketOwnershipNotifier; /** * Handle async operations that uses a sequenced executor. @@ -21,12 +22,13 @@ class PersistenceUtil; */ class AsyncHandler : public Types { public: - AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor, - const document::BucketIdFactory & bucketIdFactory); + AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &, + vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory); MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: static bool tasConditionExists(const api::TestAndSetCommand & cmd); @@ -34,6 +36,7 @@ private: spi::Context & context, bool missingDocumentImpliesMatch = false) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; + BucketOwnershipNotifier & _bucketOwnershipNotifier; vespalib::ISequencedTaskExecutor & _sequencedExecutor; const document::BucketIdFactory & _bucketIdFactory; }; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 5315d3ec0bc..3d9b359f506 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -20,7 +20,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen _mergeHandler(_env, provider, component.cluster_context(), _clock, cfg.bucketMergeChunkSize, cfg.commonMergeChainOptimalizationMinimumSize), - _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()), + _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider) { @@ -62,7 +62,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::APPLYBUCKETDIFF_ID: return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::SETBUCKETSTATE_ID: - return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 471d3d62a35..ab5066576fd 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste return checkResult(_impl.setClusterState(bucketSpace, state)); } -spi::Result -ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) +void +ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.setActiveState(bucket, newState)); + onComplete->addResultHandler(this); + _impl.setActiveStateAsync(bucket, newState, std::move(onComplete)); } spi::BucketInfoResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index d23cce9172a..9361cd1d19d 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -41,7 +41,7 @@ public: spi::Result initialize() override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; - spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override; + void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp index 0856f45c3ff..d5b44cc1911 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -5,7 +5,6 @@ #include "bucketownershipnotifier.h" #include "splitbitdetector.h" #include "messages.h" -#include <vespa/storage/common/bucketmessages.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storageapi/message/bucket.h> @@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker } MessageTracker::UP -SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.setBucketStates); - NotificationGuard notifyGuard(_bucketOwnershipNotifier); - - LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); - spi::Bucket bucket(cmd.getBucket()); - bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); - spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); - - spi::Result result(_spi.setActiveState(bucket, newState)); - if (tracker->checkForError(result)) { - StorBucketDatabase::WrappedEntry - entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState"); - if (entry.exist()) { - entry->info.setActive(newState == spi::BucketInfo::ACTIVE); - notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); - entry.write(); - } else { - LOG(warning, "Got OK setCurrentState result from provider for %s, " - "but bucket has disappeared from service layer database", - cmd.getBucketId().toString().c_str()); - } - - tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); - } - - return tracker; -} - -MessageTracker::UP SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.recheckBucketInfo); diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h index ddfa22b154c..4521e520ee9 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.h +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h @@ -21,7 +21,6 @@ public: SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &, BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization); MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const; private: |