diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-17 09:56:08 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-17 09:56:08 +0000 |
commit | d0c01828bf846d555ba648c342e58f2aa71b6cc7 (patch) | |
tree | 134f0aaa67d6e6d678970db09d222aa6edcf64d6 | |
parent | 4d60d2845dba1fcfb2b68bd64a616d50578bf88c (diff) |
Make setActiveState async.
23 files changed, 101 insertions, 76 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 66f03edafa2..ff9676da7d0 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -405,9 +405,8 @@ DummyPersistence::setClusterState(BucketSpace bucketSpace, const ClusterState& c return Result(); } -Result -DummyPersistence::setActiveState(const Bucket& b, - BucketInfo::ActiveState newState) +void +DummyPersistence::setActiveStateAsync(const Bucket& b, BucketInfo::ActiveState newState, OperationComplete::UP onComplete) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "setCurrentState(%s, %s)", @@ -416,11 +415,12 @@ DummyPersistence::setActiveState(const Bucket& b, assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); BucketContentGuard::UP bc(acquireBucketWithLock(b)); - if (!bc.get()) { - return BucketInfoResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found"); + if ( ! bc ) { + onComplete->onComplete(std::make_unique<BucketInfoResult>(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found")); + } else { + (*bc)->setActive(newState == BucketInfo::ACTIVE); + onComplete->onComplete(std::make_unique<Result>()); } - (*bc)->setActive(newState == BucketInfo::ACTIVE); - return Result(); } BucketInfoResult diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 486f4cec2f2..9d93316d382 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -155,7 +155,7 @@ public: BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result setClusterState(BucketSpace bucketSpace, const ClusterState& newState) override; - Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; + void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override; BucketInfoResult getBucketInfo(const Bucket&) const override; Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index 23a8f600024..e423e0aaac5 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -30,4 +30,9 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const return BucketIdListResult(list); } +void +AbstractPersistenceProvider::setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP op) { + op->onComplete(std::make_unique<Result>()); +} + } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 2332c05b57f..5f8cf2fc171 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -43,7 +43,7 @@ public: /** * Default impl empty. */ - Result setActiveState(const Bucket&, BucketInfo::ActiveState) override { return Result(); } + void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) override; /** * Default impl empty. */ diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 575a95269c5..bb819fc9e50 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -9,6 +9,14 @@ namespace storage::spi { PersistenceProvider::~PersistenceProvider() = default; Result +PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveState activeState) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + setActiveStateAsync(bucket, activeState, std::move(catcher)); + return *future.get(); +} + +Result PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) { auto catcher = std::make_unique<CatchResult>(); auto future = catcher->future_result(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 56ef21b5c77..45ca49435a7 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -86,7 +86,8 @@ struct PersistenceProvider * other buckets may be deactivated, so the node must be able to serve * the data from its secondary index or get reduced coverage. */ - virtual Result setActiveState(const Bucket&, BucketInfo::ActiveState) = 0; + Result setActiveState(const Bucket&, BucketInfo::ActiveState); + virtual void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) = 0; /** * Retrieve metadata for a bucket, previously returned in listBuckets(), diff --git a/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp b/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp index 487c8741a65..29748a2010c 100644 --- a/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp @@ -96,7 +96,7 @@ struct Fixture BucketStateCalculator::SP _calc; test::BucketIdListResultHandler _bucketList; test::BucketInfoResultHandler _bucketInfo; - test::GenericResultHandler _genResult; + std::shared_ptr<test::GenericResultHandler> _genResult; Fixture() : _builder(), _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()), @@ -107,7 +107,8 @@ struct Fixture _handler(_exec), _changedHandler(), _calc(new BucketStateCalculator()), - _bucketList(), _bucketInfo(), _genResult() + _bucketList(), _bucketInfo(), + _genResult(std::make_shared<test::GenericResultHandler>()) { // bucket 2 & 3 & 4 & 7 in ready _ready.insertDocs(_builder.createDocs(2, 1, 3). // 2 docs diff --git a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp index 2bb1eb44e25..e8cc1b54235 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp @@ -20,7 +20,7 @@ struct DummyPersistenceHandler : public IPersistenceHandler { void handleRemove(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentId &) override {} void handleListBuckets(IBucketIdListResultHandler &) override {} void handleSetClusterState(const storage::spi::ClusterState &, IGenericResultHandler &) override {} - void handleSetActiveState(const storage::spi::Bucket &, storage::spi::BucketInfo::ActiveState, IGenericResultHandler &) override {} + void handleSetActiveState(const storage::spi::Bucket &, storage::spi::BucketInfo::ActiveState, std::shared_ptr<IGenericResultHandler>) override {} void handleGetBucketInfo(const storage::spi::Bucket &, IBucketInfoResultHandler &) override {} void handleCreateBucket(FeedToken, const storage::spi::Bucket &) override {} void handleDeleteBucket(FeedToken, const storage::spi::Bucket &) override {} @@ -44,8 +44,6 @@ DummyPersistenceHandler::SP handler_b(std::make_shared<DummyPersistenceHandler>( DummyPersistenceHandler::SP handler_c(std::make_shared<DummyPersistenceHandler>()); DummyPersistenceHandler::SP handler_a_new(std::make_shared<DummyPersistenceHandler>()); - - void assertHandler(const IPersistenceHandler::SP & lhs, const IPersistenceHandler * rhs) { diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 9613c505f77..c252c89a2f8 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -227,10 +227,10 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer { } void handleSetActiveState(const Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) override { + std::shared_ptr<IGenericResultHandler> resultHandler) override { lastBucket = bucket; lastBucketState = newState; - resultHandler.handle(bucketStateResult); + resultHandler->handle(bucketStateResult); } void handleGetBucketInfo(const Bucket &, IBucketInfoResultHandler &resultHandler) override { diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h index b4544868bbe..b393a85f632 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h @@ -53,7 +53,7 @@ public: virtual void handleSetActiveState(const storage::spi::Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) = 0; + std::shared_ptr<IGenericResultHandler> resultHandler) = 0; virtual void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler) = 0; virtual void handleCreateBucket(FeedToken token, const storage::spi::Bucket &bucket) = 0; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 136d95a068b..cb26e80b3ff 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -306,20 +306,21 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState & } -Result -PersistenceEngine::setActiveState(const Bucket& bucket, - storage::spi::BucketInfo::ActiveState newState) +void +PersistenceEngine::setActiveStateAsync(const Bucket & bucket, BucketInfo::ActiveState newState, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace()); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); - for (; snap.handlers().valid(); snap.handlers().next()) { + auto resultHandler = std::make_shared<GenericResultHandler>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleSetActiveState(bucket, newState, resultHandler); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleSetActiveState(bucket, newState, resultHandler); + } else { + handler->handleSetActiveState(bucket, newState, std::move(resultHandler)); + } } - return *futureResult.get(); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 0aeb3e16351..e131cb13ae1 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -103,7 +103,7 @@ public: Result initialize() override; BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; Result setClusterState(BucketSpace bucketSpace, const ClusterState& calc) override; - Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; + void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override; BucketInfoResult getBucketInfo(const Bucket&) const override; void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp b/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp index d6602e18c81..c15be9336fe 100644 --- a/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp @@ -98,10 +98,10 @@ BucketHandler::handleListBuckets(IBucketIdListResultHandler &resultHandler) void BucketHandler::handleSetCurrentState(const BucketId &bucketId, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) + std::shared_ptr<IGenericResultHandler> resultHandlerSP) { - _executor.execute(makeLambdaTask([this, bucketId, newState, resultHandlerP = &resultHandler]() { - performSetCurrentState(bucketId, newState, resultHandlerP); + _executor.execute(makeLambdaTask([this, bucketId, newState, resultHandler = std::move(resultHandlerSP)]() { + performSetCurrentState(bucketId, newState, resultHandler.get()); })); } diff --git a/searchcore/src/vespa/searchcore/proton/server/buckethandler.h b/searchcore/src/vespa/searchcore/proton/server/buckethandler.h index 927744e1b8e..7f44d2ebd71 100644 --- a/searchcore/src/vespa/searchcore/proton/server/buckethandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/buckethandler.h @@ -55,7 +55,7 @@ public: void handleListBuckets(IBucketIdListResultHandler &resultHandler); void handleSetCurrentState(const document::BucketId &bucketId, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler); + std::shared_ptr<IGenericResultHandler> resultHandler); void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler); void handleListActiveBuckets(IBucketIdListResultHandler &resultHandler); diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp index 3d464cced5b..bec9197501b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp @@ -69,9 +69,9 @@ PersistenceHandlerProxy::handleSetClusterState(const storage::spi::ClusterState void PersistenceHandlerProxy::handleSetActiveState(const storage::spi::Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) + std::shared_ptr<IGenericResultHandler> resultHandler) { - _bucketHandler.handleSetCurrentState(bucket.getBucketId().stripUnused(), newState, resultHandler); + _bucketHandler.handleSetCurrentState(bucket.getBucketId().stripUnused(), newState, std::move(resultHandler)); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h index 96bfbe18423..f4d6175391c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h @@ -40,7 +40,7 @@ public: void handleSetClusterState(const storage::spi::ClusterState &calc, IGenericResultHandler &resultHandler) override; void handleSetActiveState(const storage::spi::Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) override; + std::shared_ptr<IGenericResultHandler> resultHandler) override; void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler) override; void handleCreateBucket(FeedToken token, const storage::spi::Bucket &bucket) override; diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 4a28e650fac..d2bfabd2950 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -3,11 +3,16 @@ #include "asynchandler.h" #include "persistenceutil.h" #include "testandsethelper.h" +#include "bucketownershipnotifier.h" #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/log/log.h> +LOG_SETUP(".storage.persistence.asynchandler"); + namespace storage { namespace { @@ -88,10 +93,12 @@ private: } AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, + BucketOwnershipNotifier &bucketOwnershipNotifier, vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory) : _env(env), _spi(spi), + _bucketOwnershipNotifier(bucketOwnershipNotifier), _sequencedExecutor(executor), _bucketIdFactory(bucketIdFactory) {} @@ -135,6 +142,39 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const +{ + trackerUP->setMetric(_env._metrics.setBucketStates); + + //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); + spi::Bucket bucket(cmd.getBucket()); + bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); + spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); + + auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket, + notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable { + if (tracker->checkForError(*response)) { + StorBucketDatabase::WrappedEntry + entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(), + "handleSetBucketState"); + if (entry.exist()) { + entry->info.setActive(newState == spi::BucketInfo::ACTIVE); + notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); + entry.write(); + } else { + LOG(warning, "Got OK setCurrentState result from provider for %s, " + "but bucket has disappeared from service layer database", + cmd.getBucketId().toString().c_str()); + } + tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); + } + tracker->sendReply(); + }); + _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return trackerUP; +} + +MessageTracker::UP AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const { MessageTracker & tracker = *trackerUP; diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 23f3605dca1..bf37becb2c3 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -14,6 +14,7 @@ namespace spi { class Context; } class PersistenceUtil; +class BucketOwnershipNotifier; /** * Handle async operations that uses a sequenced executor. @@ -21,12 +22,13 @@ class PersistenceUtil; */ class AsyncHandler : public Types { public: - AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor, - const document::BucketIdFactory & bucketIdFactory); + AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &, + vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory); MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: static bool tasConditionExists(const api::TestAndSetCommand & cmd); @@ -34,6 +36,7 @@ private: spi::Context & context, bool missingDocumentImpliesMatch = false) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; + BucketOwnershipNotifier & _bucketOwnershipNotifier; vespalib::ISequencedTaskExecutor & _sequencedExecutor; const document::BucketIdFactory & _bucketIdFactory; }; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 5315d3ec0bc..3d9b359f506 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -20,7 +20,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen _mergeHandler(_env, provider, component.cluster_context(), _clock, cfg.bucketMergeChunkSize, cfg.commonMergeChainOptimalizationMinimumSize), - _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()), + _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider) { @@ -62,7 +62,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::APPLYBUCKETDIFF_ID: return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::SETBUCKETSTATE_ID: - return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 471d3d62a35..ab5066576fd 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste return checkResult(_impl.setClusterState(bucketSpace, state)); } -spi::Result -ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) +void +ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.setActiveState(bucket, newState)); + onComplete->addResultHandler(this); + _impl.setActiveStateAsync(bucket, newState, std::move(onComplete)); } spi::BucketInfoResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index d23cce9172a..9361cd1d19d 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -41,7 +41,7 @@ public: spi::Result initialize() override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; - spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override; + void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp index 0856f45c3ff..d5b44cc1911 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -5,7 +5,6 @@ #include "bucketownershipnotifier.h" #include "splitbitdetector.h" #include "messages.h" -#include <vespa/storage/common/bucketmessages.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storageapi/message/bucket.h> @@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker } MessageTracker::UP -SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.setBucketStates); - NotificationGuard notifyGuard(_bucketOwnershipNotifier); - - LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); - spi::Bucket bucket(cmd.getBucket()); - bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); - spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); - - spi::Result result(_spi.setActiveState(bucket, newState)); - if (tracker->checkForError(result)) { - StorBucketDatabase::WrappedEntry - entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState"); - if (entry.exist()) { - entry->info.setActive(newState == spi::BucketInfo::ACTIVE); - notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); - entry.write(); - } else { - LOG(warning, "Got OK setCurrentState result from provider for %s, " - "but bucket has disappeared from service layer database", - cmd.getBucketId().toString().c_str()); - } - - tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); - } - - return tracker; -} - -MessageTracker::UP SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.recheckBucketInfo); diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h index ddfa22b154c..4521e520ee9 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.h +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h @@ -21,7 +21,6 @@ public: SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &, BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization); MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const; private: |